Conversation
flat index: Add support for RQ and include cache param
#1765) * Add client-side changes to handle new server-side batching in 1.33 * Update images in CI * Update 1.33 image in CI * Alter test or lazy shard loading for new potential server behaviour * Change other lazy loading test too * Fix CI image for 1.33 * Update protos, fix setting of batch client in wrapper to avoid races with connection * Remove debug assert in test * Update new batch to use different modes with server, update CI image * Refactor to changed server batching options * Throw error if using automatic batching with incompatible server * Add exponential backoff retry to stream reconnect method * Remove timeout and retries from new grpc methods * Only delete key if present in dict * Close before re-connecting, reset rec num objs on shutdown * Update to use latest protos and behaviour * Improve logging using .automatic() * Update CI image to latest server build * Fix testing issues with new versions * Attempt fixes for tests again * Add ability to retry certain server-emitted full errors, e.g. temporary replication problems * Attempt fixes of flakes * Update to use latest server impl and CI image * Update to use latest dev server version * Rename from automatic to experimental, bump CI version to latest RC * Push ongoing changes * Update to use latest server image * Update to use latest server changes * Undo debug changes to conftest * Update to use latest server image * Make internal send/recv queue size 1 and sleep while shutdown to avoid pushing to it * Update to use latest server image * Fix shutting down message handling * Skip backoff handling if client has closed the stream * Remove unused code * Don't print backoff adjustments when shutting down * Improve shutting down log * Attempt to catch last req that can be lost during shutdown * Avoid circular import * Remove last_req wrapping logic from stream, reduce logging, update image in ci * Close the client-side of the stream on shutdown, sleep for backoff during req generation * Update CI image * Only log waiting for stream re-establishment once * Switch from arm to amd in CI * Shutdown client-side stream regardless of size of __reqs queue * Increase timeout when waiting for req to send, don't use queue size in if due to unreliability * Use sentinel in req put/get to avoid inaccurate block timeouts * Update CI image * Correctly populate batch.results * Update CI images * Assert indexing status in one of the allowed values rather than a specific value * Undo debug changes in tests * Update to match new server impl * Update to use latest server image * Only start threads once to avoid runtime error when handling shutdown * Update CI images * Hard-code SSB concurrency to 1 for now * Fix collection.batch.automatic * Correct logic in `_BgThreads.is_alive` * Adjust default batch size to align with server default and avoid overloading server too fast * Update CI images and version checks in tests * Update to use latest server behaviour around backoffs and uuid/err results * Lock once when reading batch results from stream * Interpret context canceled as ungraceful shutdown to be restarted by client * Use backoff message to adjust batch size * Start batching with smallest allowed server value * Add extra log in batch send * Reintroduce timeout when getting from queue * Add log to empty queue * Add log to batch recv restart * Remove timeout when getting from internal queue * Only update batch size if value has changed * Track then log total number of objects pushed by client * WIP: receive shutdown as message and not rpc error * Move result writing inside message.results case * Add missing proto changes * Update CI image * Improve resiliance on unexpected server behaviour --------- Co-authored-by: Dirk Kulawiak <dirk@semi.technology>
…ient into introduce-experimental-batching-to-async-client
…ient into ssb/update-to-1-35-api
…e/weaviate-python-client into djanicek/drop-prop-index
add delete_property_index to collection config
Add support for cancelling backup restore ops
…l-config Introduce async repl conf in (re)configure and unit tests of usage
Add Hfresh index type
There was a problem hiding this comment.
Pull request overview
Updates the Python client to support Weaviate v1.36 features, notably server-side batch streaming (sync + async), plus new HFRESH vector index and async replication config options, along with corresponding proto/stub updates and test/workflow adjustments.
Changes:
- Add server-side batch streaming implementations for sync and async clients/collections, including new wrappers/context managers and updated gRPC stream handling.
- Add HFRESH vector index config + parsing, plus async replication configuration support and a new schema endpoint helper to delete property indexes.
- Extend backup cancel API to support canceling restore operations; update CI/test matrices and regenerate protobuf artifacts.
Reviewed changes
Copilot reviewed 57 out of 57 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| weaviate/proto/v1/v6300/v1/generative_pb2.pyi | Adds stop_sequences to AWS generative config stub. |
| weaviate/proto/v1/v6300/v1/batch_pb2.pyi | Updates BatchStreamReply stub (acks/out_of_memory; removes shutdown). |
| weaviate/proto/v1/v6300/v1/batch_pb2.py | Regenerated protobuf module reflecting updated BatchStreamReply schema. |
| weaviate/proto/v1/v5261/v1/generative_pb2.pyi | Adds stop_sequences to AWS generative config stub. |
| weaviate/proto/v1/v5261/v1/batch_pb2.pyi | Updates BatchStreamReply stub (acks/out_of_memory; removes shutdown). |
| weaviate/proto/v1/v5261/v1/batch_pb2.py | Regenerated protobuf module reflecting updated BatchStreamReply schema. |
| weaviate/proto/v1/v4216/v1/generative_pb2.pyi | Adds stop_sequences to AWS generative config stub. |
| weaviate/proto/v1/v4216/v1/batch_pb2.pyi | Updates BatchStreamReply stub (acks/out_of_memory; removes shutdown). |
| weaviate/proto/v1/v4216/v1/batch_pb2.py | Regenerated protobuf module reflecting updated BatchStreamReply schema. |
| weaviate/proto/v1/regen.sh | Makes proto regen script accept a configurable Weaviate source directory. |
| weaviate/outputs/config.py | Exposes VectorIndexConfigHFresh in outputs exports. |
| weaviate/exceptions.py | Adds WeaviateBatchFailedToReestablishStreamError. |
| weaviate/connect/v4.py | Adds streaming timeout support; adds async batch stream helpers; allows async connect(force). |
| weaviate/connect/base.py | Adds host helpers (GCP/WCD detection) used by batch streaming. |
| weaviate/config.py | Adds Timeout.stream for streaming operations. |
| weaviate/collections/data/sync.pyi | Adds ingest() to sync collection data typing. |
| weaviate/collections/data/async_.pyi | Adds ingest() to async collection data typing. |
| weaviate/collections/data/executor.py | Implements ingest() using collection batch streaming context managers. |
| weaviate/collections/config/sync.pyi | Adds HFRESH update typing and delete_property_index() typing. |
| weaviate/collections/config/async_.pyi | Adds HFRESH update typing and delete_property_index() typing. |
| weaviate/collections/config/executor.py | Implements delete_property_index() REST call and HFRESH typing support. |
| weaviate/collections/collection/sync.py | Switches collection batch implementation to v1.36 stream-capable sync batch. |
| weaviate/collections/collection/async_.py | Adds async collection batch wrapper namespace for streaming batching. |
| weaviate/collections/classes/config_vectors.py | Adds HFRESH vector index create/update support for vectors config helpers. |
| weaviate/collections/classes/config_vector_index.py | Adds VectorIndexType.HFRESH and HFRESH create/update models + factory. |
| weaviate/collections/classes/config_named_vectors.py | Adds HFRESH update typing for named vectors updates. |
| weaviate/collections/classes/config_methods.py | Adds schema parsing for HFRESH index config. |
| weaviate/collections/classes/config.py | Adds HFRESH config dataclass, async replication config models, IndexName alias, factories. |
| weaviate/collections/batch/sync.py | New sync server-side batch streaming implementation. |
| weaviate/collections/batch/async_.py | New async server-side batch streaming implementation. |
| weaviate/collections/batch/grpc_batch.py | Adds async stream entrypoint and updates stream docstrings/typing. |
| weaviate/collections/batch/collection.py | Introduces sync/async batch collection types and stream() API; deprecates experimental(). |
| weaviate/collections/batch/client.py | Introduces sync/async batch client types and stream() API; deprecates experimental(). |
| weaviate/collections/batch/batch_wrapper.py | Adds async batch wrapper/context manager variants and async protocols. |
| weaviate/collections/batch/base.py | Refactors batch base utilities; adds async node cluster helper; changes nodes status behavior. |
| weaviate/client.pyi | Exposes batch on async client typing. |
| weaviate/client.py | Instantiates async client batch wrapper. |
| weaviate/classes/config.py | Re-exports IndexName. |
| weaviate/backup/sync.pyi | Extends typing for backup cancel operation parameter. |
| weaviate/backup/async_.pyi | Extends typing for backup cancel operation parameter. |
| weaviate/backup/executor.py | Adds operation param to cancel backup or restore endpoints. |
| test/collection/test_config.py | Adds unit tests for replication config create/update serialization. |
| profiling/test_sphere.py | Updates profiling workload to use batch stream; adds async profiling variant. |
| profiling/test_shutdown.py | Updates profiling shutdown test to use batch stream API. |
| profiling/conftest.py | Adds async collection factory and enables async replication in profiling collections. |
| mock_tests/test_batch.py | Adds mock test for cancelled stream recovery behavior. |
| mock_tests/conftest.py | Updates mock Weaviate version to 1.36 and adds HEALTHY node status. |
| integration/test_rbac.py | Updates RBAC batching test to stream() and v1.36 gate. |
| integration/test_collection_hfresh.py | Adds integration tests covering HFRESH create/update/export/import. |
| integration/test_collection_config.py | Adds integration test for deleting property indexes via new endpoint. |
| integration/test_collection_batch.py | Adds async batch stream + ingest integration tests. |
| integration/test_batch_v4.py | Adds async client batching tests and updates stream gating/version checks. |
| integration/test_backup_v4.py | Splits cancel backup test and adds cancel restore test for v1.36+. |
| .github/workflows/main.yaml | Adds Weaviate 1.36 RC to CI and updates test matrix to target it. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if self.__is_shutting_down.is_set(): | ||
| logger.info("Server shutting down, closing the client-side of the stream") | ||
| return | ||
| elif self.__is_oom.is_set(): | ||
| logger.info("Server out-of-memory, closing the client-side of the stream") | ||
| return |
There was a problem hiding this comment.
out_of_memory handling sets __is_oom, but the send generator then immediately closes the stream when the request queue is empty (elif self.__is_oom.is_set(): ... return). __is_oom is only cleared on a shutting_down message, and __recv() never reconnects on OOM, so the batch will not actually “wait for the server to scale-up” as the log suggests; it will either stall until __oom_wait_time and then raise, or just terminate when the stream closes. Consider either keeping the stream open during OOM and resuming when the server indicates recovery, or explicitly reconnecting/resetting __is_oom after the requested wait time and restarting __recv() (similar to the shutting_down/GCP renewal paths).
| except asyncio.TimeoutError: | ||
| if self.__is_shutting_down.is_set(): | ||
| logger.info("Server shutting down, closing the client-side of the stream") | ||
| return | ||
| elif self.__is_oom.is_set(): | ||
| logger.info("Server out-of-memory, closing the client-side of the stream") | ||
| return | ||
| elif self.__is_hungup.is_set(): | ||
| logger.info("Detected hung up stream, closing the client-side of the stream") | ||
| return | ||
| logger.info("Timed out getting request from queue, but not stopping, continuing...") | ||
| logger.info("Batch send thread exiting due to exception...") |
There was a problem hiding this comment.
The async batch stream has the same OOM control-flow issue as the sync implementation: out_of_memory sets __is_oom, __send() exits early when __is_oom is set, and __recv() never reconnects/clears __is_oom on recovery (it’s only cleared on shutting_down). This contradicts the “wait for the server to scale-up” behavior and can lead to the batch stalling until timeout or terminating when the stream closes. Align the OOM path with the shutdown/hangup renewal logic (reconnect + clear flags + resume).
| try: | ||
| batch_object = BatchObject( | ||
| collection=collection, | ||
| properties=properties, | ||
| references=references, | ||
| uuid=uuid, | ||
| vector=vector, | ||
| tenant=tenant, | ||
| index=self.__objs_count, | ||
| ) | ||
| self.__results_for_wrapper.imported_shards.add( | ||
| Shard(collection=collection, tenant=tenant) | ||
| ) | ||
| except ValidationError as e: | ||
| raise WeaviateBatchValidationError(repr(e)) | ||
| uuid = str(batch_object.uuid) | ||
| with self.__uuid_lookup_lock: | ||
| self.__uuid_lookup.add(uuid) | ||
| self.__batch_objects.add(self.__batch_grpc.grpc_object(batch_object._to_internal())) | ||
| with self.__objs_cache_lock: | ||
| self.__objs_cache[uuid] = batch_object | ||
| self.__objs_count += 1 | ||
|
|
||
| # block if queue gets too long or weaviate is overloaded - reading files is faster them sending them so we do | ||
| # not need a long queue | ||
| while len(self.__batch_objects) >= self.__batch_size * 2: | ||
| self.__check_bg_threads_alive() | ||
| time.sleep(0.01) | ||
|
|
||
| assert batch_object.uuid is not None | ||
| return batch_object.uuid | ||
|
|
||
| def _add_reference( | ||
| self, | ||
| from_object_uuid: UUID, | ||
| from_object_collection: str, | ||
| from_property_name: str, | ||
| to: ReferenceInput, | ||
| tenant: Optional[str] = None, | ||
| ) -> None: | ||
| self.__check_bg_threads_alive() | ||
| if isinstance(to, ReferenceToMulti): | ||
| to_strs: Union[List[str], List[UUID]] = to.uuids_str | ||
| elif isinstance(to, str) or isinstance(to, uuid_package.UUID): | ||
| to_strs = [to] | ||
| else: | ||
| to_strs = list(to) | ||
|
|
||
| for uid in to_strs: | ||
| try: | ||
| batch_reference = BatchReference( | ||
| from_object_collection=from_object_collection, | ||
| from_object_uuid=from_object_uuid, | ||
| from_property_name=from_property_name, | ||
| to_object_collection=( | ||
| to.target_collection if isinstance(to, ReferenceToMulti) else None | ||
| ), | ||
| to_object_uuid=uid, | ||
| tenant=tenant, | ||
| index=self.__refs_count, | ||
| ) | ||
| except ValidationError as e: | ||
| raise WeaviateBatchValidationError(repr(e)) | ||
| self.__batch_references.add( | ||
| self.__batch_grpc.grpc_reference(batch_reference._to_internal()) | ||
| ) | ||
| with self.__refs_cache_lock: | ||
| self.__refs_cache[batch_reference._to_beacon()] = batch_reference | ||
| self.__refs_count += 1 | ||
| response = executor.result(self._connection.get(path="/nodes")) | ||
| except Exception: | ||
| return [] |
There was a problem hiding this comment.
_ClusterBatch.get_nodes_status() now swallows all exceptions and returns an empty list. Callers (e.g., batch stream reconnect logic) use this to wait for all nodes to be HEALTHY, so returning [] on transient network/API failures can turn hard failures into indefinite loops and makes troubleshooting much harder. Consider restoring the previous behavior of raising a connection/empty-response error (or at least logging and surfacing the failure) instead of silently returning [].
| python3 -m grpc_tools.protoc \ | ||
| -I ../../../../weaviate/grpc/proto \ | ||
| -I $WEAVIATE_DIR/grpc/proto \ | ||
| --python_out="$output_dir" \ | ||
| --pyi_out="$output_dir" \ | ||
| --grpc_python_out="$output_dir" \ | ||
| ../../../../weaviate/grpc/proto/v1/*.proto | ||
| $WEAVIATE_DIR/grpc/proto/v1/*.proto |
There was a problem hiding this comment.
WEAVIATE_DIR is expanded unquoted in the protoc invocation (-I $WEAVIATE_DIR/... and $WEAVIATE_DIR/.../*.proto). If the path contains spaces or glob characters, the script will break or behave unexpectedly. Quote these expansions (and consider using an array for the protoc args) to make the script robust.
| def stream( | ||
| self, | ||
| *, | ||
| concurrency: Optional[int] = None, | ||
| consistency_level: Optional[ConsistencyLevel] = None, | ||
| ) -> ClientBatchingContextManager: | ||
| """Configure the batching context manager to use batch streaming. | ||
|
|
||
| When you exit the context manager, the final batch will be sent automatically. | ||
|
|
||
| Args: | ||
| concurrency: The number of concurrent streams to use when sending batches. If not provided, the default will be one. | ||
| consistency_level: The consistency level to be used when inserting data. If not provided, the default value is `None`. | ||
| """ | ||
| if self._connection._weaviate_version.is_lower_than(1, 36, 0): | ||
| raise WeaviateUnsupportedFeatureError( | ||
| "Server-side batching", str(self._connection._weaviate_version), "1.36.0" | ||
| ) | ||
| self._batch_mode = _ServerSideBatching( | ||
| # concurrency=concurrency | ||
| # if concurrency is not None | ||
| # else len(self._cluster.get_nodes_status()) | ||
| concurrency=1, # hard-code until client-side multi-threading is fixed | ||
| ) | ||
| self._consistency_level = consistency_level | ||
| return self.__create_batch_and_reset(_BatchClientSync) |
There was a problem hiding this comment.
stream() accepts a concurrency argument but currently hard-codes concurrency=1 when constructing _ServerSideBatching, so the parameter is effectively ignored. This is an API footgun for users who expect concurrency to change behavior. Either implement concurrency, remove the parameter, or explicitly validate/reject values other than 1 and document that concurrency is not yet supported.
No description provided.