pikadick/database/
kv_store.rs

1use crate::database::Database;
2use anyhow::Context;
3use rusqlite::{
4    params,
5    OptionalExtension,
6    TransactionBehavior,
7};
8
9// K/V Store SQL
10const GET_STORE_SQL: &str = include_str!("../../sql/get_store.sql");
11const PUT_STORE_SQL: &str = include_str!("../../sql/put_store.sql");
12
13impl Database {
14    /// Get a key from the store
15    pub async fn store_get<P, K, V>(&self, prefix: P, key: K) -> anyhow::Result<Option<V>>
16    where
17        P: AsRef<[u8]>,
18        K: AsRef<[u8]>,
19        V: serde::de::DeserializeOwned,
20    {
21        let prefix = prefix.as_ref().to_vec();
22        let key = key.as_ref().to_vec();
23
24        let maybe_bytes: Option<Vec<u8>> = self
25            .access_db(move |db| {
26                db.prepare_cached(GET_STORE_SQL)?
27                    .query_row([prefix, key], |row| row.get(0))
28                    .optional()
29                    .context("failed to get value")
30            })
31            .await??;
32
33        match maybe_bytes {
34            Some(bytes) => Ok(Some(
35                bincode::deserialize(&bytes).context("failed to decode value")?,
36            )),
37            None => Ok(None),
38        }
39    }
40
41    /// Put a key in the store
42    pub async fn store_put<P, K, V>(&self, prefix: P, key: K, value: V) -> anyhow::Result<()>
43    where
44        P: AsRef<[u8]>,
45        K: AsRef<[u8]>,
46        V: serde::Serialize,
47    {
48        let prefix = prefix.as_ref().to_vec();
49        let key = key.as_ref().to_vec();
50        let value = bincode::serialize(&value).context("failed to serialize value")?;
51
52        self.access_db(move |db| {
53            let txn = db.transaction()?;
54            txn.prepare_cached(PUT_STORE_SQL)?
55                .execute(params![prefix, key, value])?;
56            txn.commit().context("failed to insert key into kv_store")
57        })
58        .await??;
59
60        Ok(())
61    }
62
63    /// Get and Put a key in the store in one action, ensuring the key is not changed between the commands.
64    pub async fn store_update<P, K, V, U>(
65        &self,
66        prefix: P,
67        key: K,
68        update_func: U,
69    ) -> anyhow::Result<()>
70    where
71        P: AsRef<[u8]>,
72        K: AsRef<[u8]>,
73        V: serde::Serialize + serde::de::DeserializeOwned,
74        U: FnOnce(Option<V>) -> V + Send + 'static,
75    {
76        let prefix = prefix.as_ref().to_vec();
77        let key = key.as_ref().to_vec();
78
79        self.access_db(move |db| {
80            let txn = db.transaction_with_behavior(TransactionBehavior::Immediate)?;
81
82            let maybe_value = txn
83                .prepare_cached(GET_STORE_SQL)?
84                .query_row(params![prefix, key], |row| row.get(0))
85                .optional()
86                .context("failed to get value")?
87                .map(|bytes: Vec<u8>| {
88                    bincode::deserialize(&bytes).context("failed to decode value")
89                })
90                .transpose()?;
91            let value = update_func(maybe_value);
92            let value = bincode::serialize(&value).context("failed to serialize value")?;
93
94            txn.prepare_cached(PUT_STORE_SQL)?
95                .execute(params![prefix, key, value])?;
96            txn.commit().context("failed to insert key into kv_store")
97        })
98        .await??;
99
100        Ok(())
101    }
102}