1use super::core::home_replica_filters;
2use crate::fs::util::entry_key_to_path;
3use crate::{
4 database::{
5 core::DATABASE,
6 posts::core::{OkuNote, OkuPost},
7 users::OkuUser,
8 },
9 fs::OkuFs,
10};
11use dashmap::DashMap;
12use iroh_blobs::Hash;
13use iroh_docs::rpc::client::docs::Entry;
14use iroh_docs::AuthorId;
15use log::error;
16use miette::IntoDiagnostic;
17use rayon::iter::{
18 FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
19};
20use std::{
21 collections::{HashMap, HashSet},
22 path::{Path, PathBuf},
23 sync::atomic::AtomicUsize,
24};
25use url::Url;
26
27impl OkuFs {
28 pub async fn posts(&self) -> Option<Vec<OkuPost>> {
34 let post_files = self
35 .read_directory(&self.home_replica().await?, Path::new("/posts/"))
36 .await
37 .ok()
38 .unwrap_or_default();
39 Some(
40 post_files
41 .par_iter()
42 .filter(|(entry, _)| {
43 entry_key_to_path(entry.key())
44 .map(|x| matches!(x.extension(), Some(y) if y == "toml"))
45 .unwrap_or(false)
46 })
47 .filter_map(|(entry, bytes)| {
48 toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
49 .ok()
50 .map(|x| OkuPost {
51 entry: entry.clone(),
52 note: x,
53 })
54 })
55 .collect(),
56 )
57 }
58
59 pub async fn all_posts(&self) -> HashSet<OkuPost> {
65 let mut posts = HashSet::<_>::from_par_iter(self.posts().await.unwrap_or_default());
66 posts.extend(DATABASE.get_posts().unwrap_or_default());
67 posts
68 }
69
70 pub async fn posts_with_tags(&self, posts: &[OkuPost], tags: &HashSet<String>) -> Vec<OkuPost> {
82 posts
83 .to_owned()
84 .into_par_iter()
85 .filter(|x| !x.note.tags.is_disjoint(tags))
86 .collect()
87 }
88
89 pub async fn all_tags(&self, posts: &HashSet<OkuPost>) -> HashSet<String> {
99 HashSet::<_>::from_par_iter(posts.into_par_iter().flat_map(|x| x.note.tags.clone()))
100 }
101
102 pub async fn count_tags(&self, posts: &HashSet<OkuPost>) -> HashMap<String, usize> {
112 let result: DashMap<String, AtomicUsize> = DashMap::new();
113 posts.into_par_iter().for_each(|x| {
114 x.note.tags.par_iter().for_each(|y| match result.get(y) {
115 None => {
116 result.insert(y.to_owned(), AtomicUsize::new(1));
117 }
118 Some(v) => {
119 v.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120 }
121 });
122 });
123 result
124 .into_par_iter()
125 .map(|(k, v)| (k, v.into_inner()))
126 .collect()
127 }
128
129 pub async fn post(&self, path: &PathBuf) -> miette::Result<OkuPost> {
139 let namespace_id = self
140 .home_replica()
141 .await
142 .ok_or(miette::miette!("Home replica not set … "))?;
143 match self.read_file(&namespace_id, path).await {
144 Ok(bytes) => {
145 let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
146 .into_diagnostic()?;
147 Ok(OkuPost {
148 entry: self.get_entry(&namespace_id, path).await?,
149 note,
150 })
151 }
152 Err(e) => Err(miette::miette!("{}", e)),
153 }
154 }
155
156 pub async fn post_from_entry(&self, entry: &Entry) -> miette::Result<OkuPost> {
166 let bytes = self
167 .content_bytes(entry)
168 .await
169 .map_err(|e| miette::miette!("{}", e))?;
170 let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
171 .into_diagnostic()?;
172 Ok(OkuPost {
173 entry: entry.clone(),
174 note,
175 })
176 }
177
178 pub async fn posts_from_user(&self, user: &OkuUser) -> miette::Result<Vec<OkuPost>> {
188 let mut posts: Vec<_> = Vec::new();
189 for post in user.posts.clone() {
190 posts.push(self.post_from_entry(&post).await?);
191 }
192 Ok(posts)
193 }
194
195 pub async fn create_or_modify_post(
213 &self,
214 url: &Url,
215 title: &String,
216 body: &String,
217 tags: &HashSet<String>,
218 ) -> miette::Result<Hash> {
219 let home_replica_id = self
220 .home_replica()
221 .await
222 .ok_or(miette::miette!("No home replica set … "))?;
223 let new_note = OkuNote {
224 url: url.clone(),
225 title: title.to_string(),
226 body: body.to_string(),
227 tags: tags.clone(),
228 };
229 let post_path = &new_note.post_path().into();
230 self.create_or_modify_file(
231 &home_replica_id,
232 post_path,
233 toml::to_string_pretty(&new_note).into_diagnostic()?,
234 )
235 .await
236 }
237
238 pub async fn delete_post(&self, path: &PathBuf) -> miette::Result<usize> {
248 let home_replica_id = self
249 .home_replica()
250 .await
251 .ok_or(miette::miette!("No home replica set … "))?;
252 self.delete_file(&home_replica_id, path).await
253 }
254
255 pub async fn fetch_post(
267 &self,
268 author_id: &AuthorId,
269 path: &PathBuf,
270 ) -> miette::Result<OkuPost> {
271 let ticket = self
272 .resolve_author_id(author_id)
273 .await
274 .map_err(|e| miette::miette!("{}", e))?;
275 let namespace_id = ticket.capability.id();
276 match self
277 .fetch_file_with_ticket(&ticket, path, &Some(home_replica_filters()))
278 .await
279 {
280 Ok(bytes) => {
281 let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
282 .into_diagnostic()?;
283 if let Err(e) = self
284 .fetch_post_embeddings(&ticket, author_id, note.url.as_ref())
285 .await
286 {
287 error!("{e}")
288 }
289 Ok(OkuPost {
290 entry: self.get_entry(&namespace_id, path).await?,
291 note,
292 })
293 }
294 Err(e) => Err(miette::miette!("{}", e)),
295 }
296 }
297
298 pub async fn get_or_fetch_post(
310 &self,
311 author_id: &AuthorId,
312 path: &PathBuf,
313 ) -> miette::Result<OkuPost> {
314 match DATABASE.get_post(author_id, path).ok().flatten() {
315 Some(post) => Ok(post),
316 None => self.fetch_post(author_id, path).await,
317 }
318 }
319}