|
对于最新的稳定版本,请使用 Spring Data Cassandra 5.0.4! |
持久化实体
CassandraTemplate 类(及其响应式变体 ReactiveCassandraTemplate)位于 org.springframework.data.cassandra 包中,是 Spring 对 Cassandra 支持的核心类,提供了丰富的功能集用于与数据库交互。
该模板提供了便捷的操作方法,用于在 Cassandra 中创建、更新、删除和查询数据,并在您的领域对象与 Cassandra 表中的行之间提供映射。
| 一旦配置完成,模板实例就是线程安全的,可以在多个实例之间重复使用。 |
Cassandra 中的行与应用程序领域类之间的映射是通过委托给 CassandraConverter 接口的实现来完成的。
Spring 提供了一个默认实现 MappingCassandraConverter,但你也可以编写自己的自定义转换器。
有关更详细的信息,请参阅
Cassandra 转换 章节。
CassandraTemplate 类实现了 CassandraOperations 接口,其响应式变体 ReactiveCassandraTemplate 则实现了 ReactiveCassandraOperations 接口。
尽可能地,[Reactive]CassandraOperations 中的方法均以 Cassandra 中已有的方法命名,以便让熟悉 Cassandra 的开发者对这套 API 感到亲切。
例如,您可以找到诸如 select、insert、delete 和 update 等方法。
其设计目标是尽可能简化在基础 Cassandra 驱动程序和 [Reactive]CassandraOperations 之间的切换使用。
这两个 API 之间的一个主要区别在于,CassandraOperations 可以传入领域对象,而无需传入 CQL 和查询对象。
引用 [Reactive]CassandraTemplate 实例上的操作的首选方式是通过 [Reactive]CassandraOperations 接口。 |
[Reactive]CassandraTemplate 使用的默认转换器实现是 MappingCassandraConverter。
虽然 MappingCassandraConverter 可以使用额外的元数据来指定对象到行的映射,但它也可以通过一些字段和表名映射的约定,来转换不包含额外元数据的对象。
这些约定以及映射注解的使用在“映射”章节中有详细说明。
[Reactive]CassandraTemplate 的另一个核心特性是将 Cassandra Java 驱动程序抛出的异常转换为 Spring 可移植的数据访问异常体系结构中的异常。
更多信息请参见
异常转换一节。
模板 API 提供了不同的执行模型风格。
基础的 CassandraTemplate 使用阻塞式(命令式-同步)执行模型。
您可以使用 AsyncCassandraTemplate 进行异步执行,并与 ListenableFuture 实例配合使用,或者使用 ReactiveCassandraTemplate 进行响应式执行。 |
实例化CassandraTemplate
CassandraTemplate 应始终配置为 Spring Bean,尽管我们之前展示了一个可以直接实例化它的示例。
然而,由于我们假定是在构建 Spring 模块的上下文中,因此默认存在 Spring 容器。
获取 CassandraTemplate 有两种方式,具体取决于你如何加载 Spring 的 ApplicationContext:
自动装配
您可以将 [Reactive]CassandraOperations 自动装配到您的项目中,如下例所示:
-
Imperative
-
Reactive
@Autowired
private CassandraOperations cassandraOperations;
@Autowired
private ReactiveCassandraOperations reactiveCassandraOperations;
与所有 Spring 自动装配一样,这假设在 [Reactive]CassandraOperations 中仅存在一个类型为 ApplicationContext 的 bean。
如果你有多个 [Reactive]CassandraTemplate bean(例如在同一项目中使用多个 keyspace 时就会出现这种情况),那么你可以使用 @Qualifier 注解来指定你想要自动装配的 bean。
-
Imperative
-
Reactive
@Autowired
@Qualifier("keyspaceOneTemplateBeanId")
private CassandraOperations cassandraOperations;
@Autowired
@Qualifier("keyspaceOneTemplateBeanId")
private ReactiveCassandraOperations reactiveCassandraOperations;
使用 ApplicationContext 进行 Bean 查找
您也可以从 [Reactive]CassandraTemplate 中查找 ApplicationContext bean,如下例所示:
-
Imperative
-
Reactive
CassandraOperations cassandraOperations = applicationContext.getBean("cassandraTemplate", CassandraOperations.class);
ReactiveCassandraOperations cassandraOperations = applicationContext.getBean("ReactiveCassandraOperations", ReactiveCassandraOperations.class);
查询行
你可以使用 Query 和 Criteria 类来表达你的查询,这些类的方法名称反映了 Cassandra 原生谓词操作符的名称,例如 lt、lte、is 等。
Query 和 Criteria 类采用流式 API 风格,因此你可以轻松地将多个方法条件和查询链式连接起来,同时保持代码易于理解。
在 Java 中创建 Query 和 Criteria 实例时,会使用静态导入以提高可读性。
查询表中的行
在前面的章节中,我们了解了如何通过 selectOneById 上的 [Reactive]CassandraTemplate 方法来检索单个对象。
这样做会返回一个单独的领域对象。
我们也可以查询多行数据,并将其作为领域对象列表返回。
假设我们在一张表中存储了多个 Person 对象,每个对象都包含姓名和年龄值,并且每个人都有一个账户余额,那么现在我们可以使用以下代码执行查询:
[Reactive]CassandraTemplate 查询行-
Imperative
-
Reactive
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
List<Person> result = cassandraTemplate.select(query(where("age").is(50))
.and(where("balance").gt(1000.00d)).withAllowFiltering(), Person.class);
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
Flux<Person> result = reactiveCassandraTemplate.select(query(where("age").is(50))
.and(where("balance").gt(1000.00d)).withAllowFiltering(), Person.class);
select、selectOne和stream方法接受一个Query对象作为参数。
该对象定义了执行查询所使用的条件和选项。
条件是通过使用Criteria对象指定的,该对象具有一个名为where的静态工厂方法,用于实例化新的Criteria对象。
我们建议使用org.springframework.data.cassandra.core.query.Criteria.where和Query.query的静态导入,以提高查询的可读性。
该查询应返回一个符合指定条件的 Person 对象列表。
Criteria 类具有以下方法,这些方法对应于 Apache Cassandra 中提供的运算符:
Criteria 类的方法
-
CriteriaDefinitiongt(Object value):使用>运算符创建一个条件。 -
CriteriaDefinitiongte(Object value):使用>=运算符创建一个条件。 -
CriteriaDefinitionin(Object… values):使用IN操作符为可变参数创建一个条件。 -
CriteriaDefinitionin(Collection<?> collection):使用集合创建一个采用IN运算符的条件。 -
CriteriaDefinitionis(Object value):通过使用字段匹配(column = value)创建一个条件。 -
CriteriaDefinitionlt(Object value):使用<运算符创建一个条件。 -
CriteriaDefinitionlte(Object value):使用⇐运算符创建一个条件。 -
CriteriaDefinitionlike(Object value):使用LIKE运算符创建一个条件。 -
CriteriaDefinitioncontains(Object value):使用CONTAINS运算符创建一个条件。 -
CriteriaDefinitioncontainsKey(Object key):使用CONTAINS KEY运算符创建一个条件。
Criteria 一旦创建后就是不可变的。
Query 类的方法
Query 类提供了一些额外的方法,可用于为查询设置选项:
-
Queryby(CriteriaDefinition… criteria):用于创建一个Query对象。 -
Query和(CriteriaDefinition criteria):用于向查询中添加额外的条件。 -
Querycolumns(Columns columns):用于定义要包含在查询结果中的列。 -
Querylimit(Limit limit):用于将返回结果的大小限制为所提供的限制值(用于SELECT语句的限制)。 -
Querylimit(long limit):用于将返回结果的大小限制为所提供的限制值(用于SELECT语句的限制)。 -
QuerypageRequest(Pageable pageRequest):用于将Sort、PagingState和fetchSize与查询关联(用于分页)。 -
QuerypagingState(ByteBuffer pagingState):用于将一个ByteBuffer与查询关联(用于分页)。 -
QueryqueryOptions(QueryOptions queryOptions):用于将QueryOptions与查询关联。 -
Querysort(Sort sort):用于为查询结果提供排序定义。 -
QuerywithAllowFiltering():用于生成带有ALLOW FILTERING的查询。
Query 一旦创建后即为不可变对象。
调用其方法会创建新的不可变(中间)Query 对象。
查询行的方法
Query 类具有以下返回行的方法:
-
List<T>select(Query query, Class<T> entityClass):从表中查询类型为T的对象列表。 -
TselectOne(Query query, Class<T> entityClass):从表中查询类型为T的单个对象。 -
Slice<T>slice(Query query, Class<T> entityClass):通过从表中查询类型为Slice的对象的T来开始或继续分页。 -
Stream<T>stream(Query query, Class<T> entityClass):从表中查询类型为T的对象流。 -
List<T>select(String cql, Class<T> entityClass):通过提供一条 CQL 语句,从表中执行即席查询,以获取类型为T的对象列表。 -
TselectOne(String cql, Class<T> entityClass):通过提供 CQL 语句,从表中执行即席查询以获取单个类型为T的对象。 -
Stream<T>stream(String cql, Class<T> entityClass):通过提供 CQL 语句,从表中执行即席查询,返回类型为T的对象流。
查询方法必须指定所返回的目标类型 T。
流畅模板 API
[Reactive]CassandraOperations 接口是在与 Apache Cassandra 进行更底层交互时的核心组件之一。
它提供了大量方法。
每个方法都有多个重载版本。
其中大多数覆盖了 API 中可选(可为 null)的部分。
FluentCassandraOperations 及其响应式变体 ReactiveFluentCassandraOperations 为 [Reactive]CassandraOperations 的常用方法提供了更精简的接口,从而实现更具可读性的流畅 API。
入口方法(query(…)、insert(…)、update(…) 和 delete(…))采用基于要执行操作的自然命名方案。
从入口方法开始,该 API 的设计仅提供上下文相关的方法,引导开发者调用最终触发实际 [Reactive]CassandraOperations 操作的终止方法。
以下示例展示了该流畅 API:
-
Imperative
-
Reactive
List<SWCharacter> all = ops.query(SWCharacter.class)
.inTable("star_wars") (1)
.all();
| 1 | 如果 SWCharacter 使用 @Table 注解定义了表名,或者使用类名作为表名不会造成问题,则可跳过此步骤。 |
Flux<SWCharacter> all = ops.query(SWCharacter.class)
.inTable("star_wars") (1)
.all();
| 1 | 如果 SWCharacter 使用 @Table 注解定义了表名,或者使用类名作为表名不会造成问题,则可跳过此步骤。 |
如果 Cassandra 中的一张表包含不同类型实体,例如在 Jedi 表中包含 SWCharacters 实体,你可以使用不同的类型来映射查询结果。
你可以使用 as(Class<?> targetType) 将结果映射到不同的目标类型,而 query(Class<?> entityType) 仍然用于指定查询和表名。
以下示例使用了 query 和 as 方法:
-
Imperative
-
Reactive
List<Jedi> all = ops.query(SWCharacter.class) (1)
.as(Jedi.class) (2)
.matching(query(where("jedi").is(true)))
.all();
| 1 | 查询字段被映射到 SWCharacter 类型。 |
| 2 | 结果行被映射为 Jedi。 |
Flux<Jedi> all = ops.query(SWCharacter.class) (1)
.as(Jedi.class) (2)
.matching(query(where("jedi").is(true)))
.all();
| 1 | 查询字段被映射到 SWCharacter 类型。 |
| 2 | 结果行被映射为 Jedi。 |
你可以通过 xref page 仅提供 ../repositories/projections.html 类型,直接将投影(Projections)应用到结果文档上。 |
终止方法(first()、one()、all() 和 stream())用于处理在检索单个实体与以 List 或 Stream 等形式检索多个实体之间的切换,以及类似的操作。
新的流式模板 API 方法(即 query(…)、insert(…)、update(…) 和 delete(…))使用了有效的线程安全辅助对象来构建 CQL 语句。
然而,这种设计也带来了额外的代价:由于其基于 CQL 语句各个组件的 final 字段以及在每次变更时进行构造,会导致 JVM 年轻代堆内存产生额外的开销。
因此,在可能插入或删除大量对象时(例如在循环内部),应格外小心。 |
向量搜索查询
投影(Projections)是查询以不同于实体形式返回数据的基础。
虽然 Cassandra 的主要使用场景遵循键值模型——即存储什么就检索什么——但向量搜索则有所不同。
执行向量搜索查询通常会得到聚合或类似报表的结果集。
典型的查询会返回某种形式的内容(例如一个 text 列),以及该内容与实际向量的得分(或距离)。
考虑以下领域模型:
@Table
class Comments {
@Id UUID id;
String comment;
@VectorType(dimensions = 5)
@SaiIndexed Vector vector;
}
class CommentSearch {
String comment;
float similarity;
}
Comments 是定义向量列和 comment 列的领域类型。
运行向量搜索需要使用 ANN 排序,并通常会定义一个相似度函数,以确定其与给定向量之间的距离。
Columns columns = Columns.from("comment") (1)
.select("vector", builder -> builder.similarity(vector)
.cosine().as("similarity")); (2)
Query query = Query.select(columns)
.limit(3)
.sort(VectorSort.ann("vector", vector)); (3)
template.query(Comments.class)
.as(CommentSearch.class) (4)
.matching(query)
.all();
| 1 | 选择要查询的列。 |
| 2 | 包含一个对 similarity_cosine(vector, […]) 的函数调用。Columns.select(…) 使用选择器构建器定制器来配置实际的选择内容。
请务必声明一个别名,以便将结果映射到 CommentSearch.similarity,因为结果映射使用的是列名。 |
| 3 | 使用 VectorSort.ann(…) 来定义相对于给定向量的排序顺序。 |
| 4 | 定义用于映射结果的目标类型。
CommentSearch 类型是一个结果投影类型,用于定义映射结果的列。 |
保存、更新和删除行
[Reactive]CassandraTemplate 为您提供了一种简单的方式来保存、更新和删除您的领域对象,并将这些对象映射到 Cassandra 中管理的表。
插入和更新行的方法
[Reactive]CassandraTemplate 提供了多个便捷方法用于保存和插入您的对象。
若要对转换过程进行更细粒度的控制,您可以向 Converter 注册 Spring 的 MappingCassandraConverter 实例
(例如,Converter<Row, Person>)。
插入操作与更新操作的区别在于,INSERT 操作不会插入 null 值。 |
使用 INSERT 操作的简单场景是保存一个 POJO。
在这种情况下,表名由类的简单名称(而非全限定类名)确定。
可以通过使用映射元数据来覆盖用于存储该对象的表。
在插入或更新时,必须设置 id 属性。
Apache Cassandra 无法生成 ID。
以下示例使用了 save 操作并获取其内容:
[Reactive]CassandraTemplate 插入和检索对象-
Imperative
-
Reactive
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
Person bob = new Person("Bob", 33);
cassandraTemplate.insert(bob);
Person queriedBob = cassandraTemplate.selectOneById(query(where("age").is(33)), Person.class);
import static org.springframework.data.cassandra.core.query.Criteria.where;
import static org.springframework.data.cassandra.core.query.Query.query;
…
Person bob = new Person("Bob", 33);
cassandraTemplate.insert(bob);
Mono<Person> queriedBob = reactiveCassandraTemplate.selectOneById(query(where("age").is(33)), Person.class);
您可以使用以下操作来插入和保存:
-
voidinsert(Object objectToSave):将对象插入到 Apache Cassandra 表中。 -
WriteResultinsert(Object objectToSave, InsertOptions options):将对象插入到 Apache Cassandra 表中,并应用InsertOptions。
您可以使用以下更新操作:
-
voidupdate(Object objectToSave):更新 Apache Cassandra 表中的对象。 -
WriteResultupdate(Object objectToSave, UpdateOptions options):更新 Apache Cassandra 表中的对象并应用UpdateOptions。
你也可以使用传统方式,自己编写 CQL 语句,如下例所示:
-
Imperative
-
Reactive
String cql = "INSERT INTO person (age, name) VALUES (39, 'Bob')";
cassandraTemplate().getCqlOperations().execute(cql);
String cql = "INSERT INTO person (age, name) VALUES (39, 'Bob')";
Mono<Boolean> applied = reactiveCassandraTemplate.getReactiveCqlOperations().execute(cql);
在使用 InsertOptions 和 UpdateOptions 时,您还可以配置额外的选项,例如 TTL、一致性级别和轻量级事务。
我的行被插入到了哪张表中?
你可以通过两种方式管理用于操作表的表名。
默认的表名是简单类名转换为首字母小写的形式。
因此,com.example.Person 类的实例将被存储在 person 表中。
第二种方式是在 @Table 注解中指定表名。
在批处理中插入、更新和删除单个对象
Cassandra 协议支持通过使用批处理(batch)在一次操作中插入多行数据。
[Reactive]CassandraTemplate 接口中的以下方法支持此功能:
-
batchOps:创建一个新的[Reactive]CassandraBatchOperations以填充批处理。
[Reactive]CassandraBatchOperations
-
insert:接收单个对象、对象数组(可变参数)或对象的Iterable进行插入。 -
update:接收单个对象、对象数组(可变参数)或对象的Iterable以进行更新。 -
delete:接受单个对象、一个数组(可变参数)或一个要删除的对象的Iterable。 -
withTimestamp:为批处理应用一个 TTL(生存时间)。 -
execute:执行批处理。
在表中更新行
对于更新操作,您可以选择更新多行数据。
以下示例展示了通过使用 + 赋值操作,向账户余额一次性添加 $50.00 奖金,从而更新单个账户对象:
[Reactive]CasandraTemplate 更新行-
Imperative
-
Reactive
import static org.springframework.data.cassandra.core.query.Criteria.where;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.cassandra.core.query.Update;
…
boolean applied = cassandraTemplate.update(Query.query(where("id").is("foo")),
Update.create().increment("balance", 50.00), Account.class);
import static org.springframework.data.cassandra.core.query.Criteria.where;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.cassandra.core.query.Update;
…
Mono<Boolean> wasApplied = reactiveCassandraTemplate.update(Query.query(where("id").is("foo")),
Update.create().increment("balance", 50.00), Account.class);
除了前面讨论过的 Query 之外,我们还通过使用 Update 对象来提供更新定义。
Update 类提供了与 Apache Cassandra 中可用的更新赋值操作相匹配的方法。
大多数方法都会返回 Update 对象,以提供流畅的 API,便于代码风格的编写。
执行行更新的方法
update 方法可以更新行,如下所示:
-
booleanupdate(Query query, Update update, Class<?> entityClass):更新 Apache Cassandra 表中的一组对象。
Update 类的方法
Update 类可以配合少量“语法糖”使用,因为其方法设计为可链式调用。
此外,你可以通过静态方法 Update 并结合静态导入,快速创建一个新的 public static Update update(String key, Object value) 实例。
Update 类包含以下方法:
-
AddToBuilderaddTo(String columnName)AddToBuilder入口点:-
更新
prepend(Object value):通过使用+更新赋值操作,将一个集合值添加到现有集合的开头。 -
更新
prependAll(Object… values):使用+更新赋值操作,将所有集合值添加到现有集合的开头。 -
更新
append(Object value):通过使用+更新赋值操作,将一个集合值追加到现有集合中。 -
更新
append(Object… values):使用+更新赋值操作,将所有集合值追加到现有集合中。 -
更新
entry(Object key, Object value):使用+更新赋值操作添加一个映射条目。 -
更新
addAll(Map<? extends Object, ? extends Object> map):使用+更新赋值操作将所有映射条目添加到该映射中。
-
-
Updateremove(String columnName, Object value):使用-更新赋值操作从集合中移除该值。 -
Updateclear(String columnName):清空集合。 -
Updateincrement(String columnName, Number delta):使用+更新赋值进行更新。 -
Updatedecrement(String columnName, Number delta):使用-更新赋值进行更新。 -
Updateset(String columnName, Object value):使用=更新赋值进行更新。 -
SetBuilderset(String columnName)SetBuilder入口点:-
更新
atIndex(int index).to(Object value):使用=更新赋值操作,将集合中指定索引处的元素设置为给定值。 -
更新
atKey(String object).to(Object value):将给定键的映射条目设置为通过=更新赋值操作指定的值。
-
以下列表展示了一些更新示例:
// UPDATE … SET key = 'Spring Data';
Update.update("key", "Spring Data")
// UPDATE … SET key[5] = 'Spring Data';
Update.empty().set("key").atIndex(5).to("Spring Data");
// UPDATE … SET key = key + ['Spring', 'DATA'];
Update.empty().addTo("key").appendAll("Spring", "Data");
请注意,Update 一旦创建就是不可变的。
调用其方法会创建新的不可变(中间)Update 对象。
删除行的方法
您可以使用以下重载方法从数据库中删除一个对象:
-
booleandelete(Query query, Class<?> entityClass):删除由Query选中的对象。 -
Tdelete(T entity):删除给定的对象。 -
Tdelete(T entity, QueryOptions queryOptions):删除给定对象,并应用QueryOptions。 -
booleandeleteById(Object id, Class<?> entityClass):使用给定的 ID 删除对象。
乐观锁
@Version 注解在 Cassandra 上下文中提供了与 JPA 类似的语法,并确保更新仅应用于具有匹配版本号的行。
乐观锁利用 Cassandra 的轻量级事务来有条件地插入、更新和删除行。
因此,INSERT 语句会使用 IF NOT EXISTS 条件执行。
对于更新和删除操作,版本属性的实际值会被添加到 UPDATE 条件中,这样如果在此期间其他操作已修改了该行,则当前修改将不会生效。
在这种情况下,会抛出一个 OptimisticLockingFailureException 异常。
以下示例展示了这些特性:
@Table
class Person {
@Id String id;
String firstname;
String lastname;
@Version Long version;
}
Person daenerys = template.insert(new Person("Daenerys")); (1)
Person tmp = template.findOne(query(where("id").is(daenerys.getId())), Person.class); (2)
daenerys.setLastname("Targaryen");
template.save(daenerys); (3)
template.save(tmp); // throws OptimisticLockingFailureException (4)
| 1 | 初始插入文档。version 被设置为 0。 |
| 2 | 加载刚刚插入的文档。version 仍然是 0。 |
| 3 | 使用 version = 0 更新文档。
设置 lastname 并将 version 增加至 1。 |
| 4 | 尝试更新先前加载的文档,该文档仍具有 version = 0。
由于当前 OptimisticLockingFailureException 为 version,此操作会因 1 而失败。 |
| 乐观锁仅支持单个实体的操作,不支持批量操作。 |