NoSQL (Distributed / Big Data) Databases

Warning

The current page still doesn’t have a translation for this language.

But you can help translating it: Contributing.

FastAPI can also be integrated with any NoSQL.

Here we’ll see an example using Couchbase, a document based NoSQL database.

You can adapt it to any other NoSQL database like:

  • MongoDB
  • Cassandra
  • CouchDB
  • ArangoDB
  • ElasticSearch, etc.

Tip

There is an official project generator with FastAPI and Couchbase, all based on Docker, including a frontend and more tools: https://github.com/tiangolo/full-stack-fastapi-couchbase

Import Couchbase components

For now, don’t pay attention to the rest, only the imports:

  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

Define a constant to use as a “document type”

We will use it later as a fixed field type in our documents.

This is not required by Couchbase, but is a good practice that will help you afterwards.

  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

Add a function to get a Bucket

In Couchbase, a bucket is a set of documents, that can be of different types.

They are generally all related to the same application.

The analogy in the relational database world would be a “database” (a specific database, not the database server).

The analogy in MongoDB would be a “collection”.

In the code, a Bucket represents the main entrypoint of communication with the database.

This utility function will:

  • Connect to a Couchbase cluster (that might be a single machine).
    • Set defaults for timeouts.
  • Authenticate in the cluster.
  • Get a Bucket instance.
    • Set defaults for timeouts.
  • Return it.
  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

Create Pydantic models

As Couchbase “documents” are actually just “JSON objects”, we can model them with Pydantic.

User model

First, let’s create a User model:

  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

We will use this model in our path operation function, so, we don’t include in it the hashed_password.

UserInDB model

Now, let’s create a UserInDB model.

This will have the data that is actually stored in the database.

We don’t create it as a subclass of Pydantic’s BaseModel but as a subclass of our own User, because it will have all the attributes in User plus a couple more:

  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

Note

Notice that we have a hashed_password and a type field that will be stored in the database.

But it is not part of the general User model (the one we will return in the path operation).

Get the user

Now create a function that will:

  • Take a username.
  • Generate a document ID from it.
  • Get the document with that ID.
  • Put the contents of the document in a UserInDB model.

By creating a function that is only dedicated to getting your user from a username (or any other parameter) independent of your path operation function, you can more easily re-use it in multiple parts and also add unit tests for it:

  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

f-strings

If you are not familiar with the f"userprofile::{username}", it is a Python “f-string“.

Any variable that is put inside of {} in an f-string will be expanded / injected in the string.

dict unpacking

If you are not familiar with the UserInDB(**result.value), it is using dict “unpacking”.

It will take the dict at result.value, and take each of its keys and values and pass them as key-values to UserInDB as keyword arguments.

So, if the dict contains:

  1. {
  2. "username": "johndoe",
  3. "hashed_password": "some_hash",
  4. }

It will be passed to UserInDB as:

  1. UserInDB(username="johndoe", hashed_password="some_hash")

Create your FastAPI code

Create the FastAPI app

  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

Create the path operation function

As our code is calling Couchbase and we are not using the experimental Python await support, we should declare our function with normal def instead of async def.

Also, Couchbase recommends not using a single Bucket object in multiple “threads”, so, we can just get the bucket directly and pass it to our utility functions:

  1. from typing import Optional
  2. from couchbase import LOCKMODE_WAIT
  3. from couchbase.bucket import Bucket
  4. from couchbase.cluster import Cluster, PasswordAuthenticator
  5. from fastapi import FastAPI
  6. from pydantic import BaseModel
  7. USERPROFILE_DOC_TYPE = "userprofile"
  8. def get_bucket():
  9. cluster = Cluster(
  10. "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
  11. )
  12. authenticator = PasswordAuthenticator("username", "password")
  13. cluster.authenticate(authenticator)
  14. bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
  15. bucket.timeout = 30
  16. bucket.n1ql_timeout = 300
  17. return bucket
  18. class User(BaseModel):
  19. username: str
  20. email: Optional[str] = None
  21. full_name: Optional[str] = None
  22. disabled: Optional[bool] = None
  23. class UserInDB(User):
  24. type: str = USERPROFILE_DOC_TYPE
  25. hashed_password: str
  26. def get_user(bucket: Bucket, username: str):
  27. doc_id = f"userprofile::{username}"
  28. result = bucket.get(doc_id, quiet=True)
  29. if not result.value:
  30. return None
  31. user = UserInDB(**result.value)
  32. return user
  33. # FastAPI specific code
  34. app = FastAPI()
  35. @app.get("/users/{username}", response_model=User)
  36. def read_user(username: str):
  37. bucket = get_bucket()
  38. user = get_user(bucket=bucket, username=username)
  39. return user

Recap

You can integrate any third party NoSQL database, just using their standard packages.

The same applies to any other external tool, system or API.