oku_fs/fs/
core.rs

1use super::*;
2use crate::discovery::{INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY};
3use bytes::Bytes;
4#[cfg(feature = "fuse")]
5use fuse_mt::spawn_mount;
6use iroh::protocol::ProtocolHandler;
7use iroh_blobs::store::Store;
8use iroh_docs::Author;
9use log::{error, info};
10#[cfg(feature = "fuse")]
11use miette::IntoDiagnostic;
12use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
13#[cfg(feature = "fuse")]
14use std::collections::HashMap;
15use std::path::PathBuf;
16#[cfg(feature = "fuse")]
17use std::sync::Arc;
18#[cfg(feature = "fuse")]
19use std::sync::RwLock;
20#[cfg(feature = "fuse")]
21use tokio::runtime::Handle;
22use tokio::sync::watch::{self};
23
24impl OkuFs {
25    /// Obtain the private key of the node's authorship credentials.
26    ///
27    /// # Return
28    ///
29    /// The private key of the node's authorship credentials.
30    pub async fn get_author(&self) -> anyhow::Result<Author> {
31        let default_author_id = self.default_author().await;
32
33        self.docs
34            .client()
35            .authors()
36            .export(default_author_id)
37            .await
38            .ok()
39            .flatten()
40            .ok_or(anyhow::anyhow!(
41                "Missing private key for default author ({}).",
42                crate::fs::util::fmt_short(default_author_id)
43            ))
44    }
45
46    /// Starts an instance of an Oku file system.
47    /// In the background, an Iroh node is started if none is running, or is connected to if one is already running.
48    ///
49    /// # Arguments
50    ///
51    /// * `handle` - If compiling with the `fuse` feature, a Tokio runtime handle is required.
52    ///
53    /// # Returns
54    ///
55    /// A running instance of an Oku file system.
56    pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
57        let endpoint = iroh::Endpoint::builder()
58            .discovery_n0()
59            .discovery_dht()
60            .discovery_local_network()
61            .bind()
62            .await?;
63        let blobs = iroh_blobs::net_protocol::Blobs::persistent(NODE_PATH.clone())
64            .await?
65            .build(&endpoint);
66        let gossip = iroh_gossip::net::Gossip::builder()
67            .spawn(endpoint.clone())
68            .await?;
69        let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
70            .spawn(&blobs, &gossip)
71            .await?;
72
73        let router = iroh::protocol::Router::builder(endpoint.clone())
74            .accept(iroh_blobs::ALPN, blobs.clone())
75            .accept(iroh_gossip::ALPN, gossip.clone())
76            .accept(iroh_docs::ALPN, docs.clone())
77            .spawn()
78            .await?;
79        info!(
80            "Default author ID is {} … ",
81            crate::fs::util::fmt_short(docs.client().authors().default().await.unwrap_or_default())
82        );
83
84        let (replica_sender, _replica_receiver) = watch::channel(());
85        let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
86
87        let oku_fs = Self {
88            endpoint,
89            blobs,
90            docs,
91            router,
92            replica_sender,
93            okunet_fetch_sender,
94            #[cfg(feature = "fuse")]
95            fs_handles: Arc::new(RwLock::new(HashMap::new())),
96            #[cfg(feature = "fuse")]
97            newest_handle: Arc::new(RwLock::new(0)),
98            #[cfg(feature = "fuse")]
99            handle: handle.clone(),
100            dht: mainline::Dht::server()?.as_async(),
101        };
102        let oku_fs_clone = oku_fs.clone();
103        tokio::spawn(async move {
104            loop {
105                tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
106                match oku_fs_clone.announce_replicas().await {
107                    Ok(_) => info!("Announced all replicas … "),
108                    Err(e) => error!("{}", e),
109                }
110                match oku_fs_clone.refresh_users().await {
111                    Ok(_) => info!("Refreshed OkuNet database … "),
112                    Err(e) => error!("{}", e),
113                }
114                tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await;
115            }
116        });
117        Ok(oku_fs.clone())
118    }
119
120    /// Shuts down the Oku file system.
121    pub async fn shutdown(self) {
122        let _ = self.endpoint.close().await;
123        let _ = self.router.shutdown().await;
124        self.docs.shutdown().await;
125        self.blobs.shutdown().await;
126        self.blobs.store().shutdown().await;
127    }
128
129    /// Retrieve the content of a document entry.
130    ///
131    /// # Arguments
132    ///
133    /// * `entry` - An entry in an Iroh document.
134    ///
135    /// # Returns
136    ///
137    /// The content of the entry, as raw bytes.
138    pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
139        self.content_bytes_by_hash(&entry.content_hash()).await
140    }
141
142    /// Retrieve the content of a document entry by its hash.
143    ///
144    /// # Arguments
145    ///
146    /// * `hash` - The content hash of an Iroh document.
147    ///
148    /// # Returns
149    ///
150    /// The content of the entry, as raw bytes.
151    pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
152        self.blobs.client().read_to_bytes(*hash).await
153    }
154
155    /// Determines the oldest timestamp of a file entry in any replica stored locally.
156    ///
157    /// # Returns
158    ///
159    /// The oldest timestamp in any local replica, in microseconds from the Unix epoch.
160    pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
161        let replicas = self.list_replicas().await?;
162        let mut timestamps: Vec<u64> = Vec::new();
163        for (replica, _capability_kind) in replicas {
164            timestamps.push(
165                self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
166                    .await?,
167            );
168        }
169        Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
170    }
171
172    /// Determines the latest timestamp of a file entry in any replica stored locally.
173    ///
174    /// # Returns
175    ///
176    /// The latest timestamp in any local replica, in microseconds from the Unix epoch.
177    pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
178        let replicas = self.list_replicas().await?;
179        let mut timestamps: Vec<u64> = Vec::new();
180        for (replica, _capability_kind) in replicas {
181            timestamps.push(
182                self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
183                    .await?,
184            );
185        }
186        Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
187    }
188
189    /// Determines the size of the file system.
190    ///
191    /// # Returns
192    ///
193    /// The total size, in bytes, of the files in every replica stored locally.
194    pub async fn get_size(&self) -> miette::Result<u64> {
195        let replicas = self.list_replicas().await?;
196        let mut size = 0;
197        for (replica, _capability_kind) in replicas {
198            size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
199        }
200        Ok(size)
201    }
202
203    #[cfg(feature = "fuse")]
204    /// Mount the file system.
205    ///
206    /// # Arguments
207    ///
208    /// * `path` - The path to the file system mount point.
209    ///
210    /// # Returns
211    ///
212    /// A handle referencing the mounted file system; joining or dropping the handle will unmount the file system and shutdown the node.
213    pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
214        spawn_mount(fuse_mt::FuseMT::new(self.clone(), 1), path, &[]).into_diagnostic()
215    }
216}