Skip to main content

oku_fs/database/posts/
operations.rs

1use super::super::core::*;
2use super::core::OkuPost;
3use super::core::POST_INDEX;
4use super::core::POST_INDEX_READER;
5use super::core::POST_INDEX_WRITER;
6use super::core::POST_SCHEMA;
7use crate::fs::util::path_to_entry_key;
8use iroh_docs::AuthorId;
9use log::error;
10use miette::IntoDiagnostic;
11use native_db::*;
12use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
13use std::{collections::HashSet, path::PathBuf};
14use tantivy::{collector::TopDocs, query::QueryParser, TantivyDocument};
15
16impl OkuDatabase {
17    /// Search OkuNet posts with a query string.
18    ///
19    /// # Arguments
20    ///
21    /// * `query_string` - The string used to query for posts.
22    ///
23    /// * `result_limit` - The maximum number of results to get (defaults to 10).
24    ///
25    /// # Returns
26    ///
27    /// A list of OkuNet posts.
28    pub fn search_posts(
29        query_string: &str,
30        result_limit: &Option<usize>,
31    ) -> miette::Result<Vec<OkuPost>> {
32        let searcher = POST_INDEX_READER.searcher();
33        let query_parser = QueryParser::for_index(
34            &POST_INDEX,
35            vec![
36                POST_SCHEMA.1["author_id"],
37                POST_SCHEMA.1["path"],
38                POST_SCHEMA.1["title"],
39                POST_SCHEMA.1["body"],
40                POST_SCHEMA.1["tag"],
41            ],
42        );
43        let query = query_parser.parse_query(query_string).into_diagnostic()?;
44        let limit = result_limit.unwrap_or(10);
45        let top_docs = searcher
46            .search(&query, &TopDocs::with_limit(limit))
47            .into_diagnostic()?;
48        Ok(top_docs
49            .par_iter()
50            .filter_map(|x| searcher.doc(x.1).ok())
51            .collect::<Vec<TantivyDocument>>()
52            .into_par_iter()
53            .filter_map(|x| TryInto::try_into(x).ok())
54            .collect())
55    }
56
57    /// Insert or update an OkuNet post.
58    ///
59    /// # Arguments
60    ///
61    /// * `post` - An OkuNet post to upsert.
62    ///
63    /// # Returns
64    ///
65    /// The previous version of the post, if one existed.
66    pub fn upsert_post(&self, post: &OkuPost) -> miette::Result<Option<OkuPost>> {
67        let rw: transaction::RwTransaction<'_> =
68            self.database.rw_transaction().into_diagnostic()?;
69        let old_value: Option<OkuPost> = rw.upsert(post.clone()).into_diagnostic()?;
70        rw.commit().into_diagnostic()?;
71
72        let mut index_writer = POST_INDEX_WRITER
73            .clone()
74            .try_lock_owned()
75            .into_diagnostic()?;
76        if let Some(old_post) = old_value.clone() {
77            index_writer.delete_term(old_post.index_term());
78        }
79        index_writer
80            .add_document(post.to_owned().into())
81            .into_diagnostic()?;
82        index_writer.commit().into_diagnostic()?;
83
84        Ok(old_value)
85    }
86
87    /// Insert or update multiple OkuNet posts.
88    ///
89    /// # Arguments
90    ///
91    /// * `posts` - A list of OkuNet posts to upsert.
92    ///
93    /// # Returns
94    ///
95    /// A list containing the previous version of each post, if one existed.
96    pub fn upsert_posts(&self, posts: &Vec<OkuPost>) -> miette::Result<Vec<Option<OkuPost>>> {
97        let rw = self.database.rw_transaction().into_diagnostic()?;
98        let old_posts: Vec<_> = posts
99            .clone()
100            .into_iter()
101            .filter_map(|post| rw.upsert(post).ok())
102            .collect();
103        rw.commit().into_diagnostic()?;
104
105        let mut index_writer = POST_INDEX_WRITER
106            .clone()
107            .try_lock_owned()
108            .into_diagnostic()?;
109        old_posts.par_iter().for_each(|old_post| {
110            if let Some(old_post) = old_post {
111                index_writer.delete_term(old_post.index_term());
112            }
113        });
114        posts.par_iter().for_each(|post| {
115            if let Err(e) = index_writer.add_document(post.clone().into()) {
116                error!("{e}");
117            }
118        });
119        index_writer.commit().into_diagnostic()?;
120
121        Ok(old_posts)
122    }
123
124    /// Delete an OkuNet post.
125    ///
126    /// # Arguments
127    ///
128    /// * `post` - An OkuNet post to delete.
129    ///
130    /// # Returns
131    ///
132    /// The deleted post.
133    pub fn delete_post(&self, post: &OkuPost) -> miette::Result<OkuPost> {
134        let rw = self.database.rw_transaction().into_diagnostic()?;
135        let removed_post = rw.remove(post.to_owned()).into_diagnostic()?;
136        rw.commit().into_diagnostic()?;
137
138        let mut index_writer = POST_INDEX_WRITER
139            .clone()
140            .try_lock_owned()
141            .into_diagnostic()?;
142        index_writer.delete_term(removed_post.index_term());
143        index_writer.commit().into_diagnostic()?;
144
145        Ok(removed_post)
146    }
147
148    /// Delete multiple OkuNet posts.
149    ///
150    /// # Arguments
151    ///
152    /// * `posts` - A list of OkuNet posts to delete.
153    ///
154    /// # Returns
155    ///
156    /// A list containing the deleted posts.
157    pub fn delete_posts(&self, posts: &[OkuPost]) -> miette::Result<Vec<OkuPost>> {
158        let rw = self.database.rw_transaction().into_diagnostic()?;
159        let removed_posts: Vec<_> = posts
160            .iter()
161            .filter_map(|post| rw.remove(post.to_owned()).ok())
162            .collect();
163        rw.commit().into_diagnostic()?;
164
165        let mut index_writer = POST_INDEX_WRITER
166            .clone()
167            .try_lock_owned()
168            .into_diagnostic()?;
169        removed_posts.par_iter().for_each(|removed_post| {
170            index_writer.delete_term(removed_post.index_term());
171        });
172        index_writer.commit().into_diagnostic()?;
173
174        Ok(removed_posts)
175    }
176
177    /// Retrieves all known OkuNet posts.
178    ///
179    /// # Returns
180    ///
181    /// A list of all known OkuNet posts.
182    pub fn get_posts(&self) -> miette::Result<Vec<OkuPost>> {
183        let r = self.database.r_transaction().into_diagnostic()?;
184        r.scan()
185            .primary()
186            .into_diagnostic()?
187            .all()
188            .into_diagnostic()?
189            .collect::<Result<Vec<_>, _>>()
190            .into_diagnostic()
191    }
192
193    /// Retrieves all known OkuNet posts by a given author.
194    ///
195    /// # Arguments
196    ///
197    /// * `author_id` - A content authorship ID.
198    ///
199    /// # Returns
200    ///
201    /// A list of all known OkuNet posts by the given author.
202    pub fn get_posts_by_author(&self, author_id: &AuthorId) -> miette::Result<Vec<OkuPost>> {
203        Ok(self
204            .get_posts()?
205            .into_par_iter()
206            .filter(|x| x.entry.author() == *author_id)
207            .collect())
208    }
209
210    /// Retrieves all known OkuNet posts by a given tag.
211    ///
212    /// # Arguments
213    ///
214    /// * `tag` - A tag.
215    ///
216    /// # Returns
217    ///
218    /// A list of all known OkuNet posts with the given tag.
219    pub fn get_posts_by_tag(&self, tag: &String) -> miette::Result<Vec<OkuPost>> {
220        Ok(self
221            .get_posts()?
222            .into_par_iter()
223            .filter(|x| x.note.tags.contains(tag))
224            .collect())
225    }
226
227    /// Retrieves all distinct tags used in OkuNet posts.
228    ///
229    /// # Returns
230    ///
231    /// A list of all tags that appear in an OkuNet post.
232    pub fn get_tags(&self) -> miette::Result<HashSet<String>> {
233        Ok(self
234            .get_posts()?
235            .into_iter()
236            .flat_map(|x| x.note.tags)
237            .collect())
238    }
239
240    /// Retrieves an OkuNet post.
241    ///
242    /// # Arguments
243    ///
244    /// * `author_id` - A content authorship ID.
245    ///
246    /// * `path` - A path to a post in the author's home replica.
247    ///
248    /// # Returns
249    ///
250    /// The OkuNet post by the given author at the given path, if one exists.
251    pub fn get_post(
252        &self,
253        author_id: &AuthorId,
254        path: &PathBuf,
255    ) -> miette::Result<Option<OkuPost>> {
256        let r = self.database.r_transaction().into_diagnostic()?;
257        let entry_key = (
258            author_id.as_bytes().to_vec(),
259            path_to_entry_key(path).to_vec(),
260        );
261        r.get().primary(entry_key).into_diagnostic()
262    }
263}