Skip to content

Dev/1.36#1935

Merged
dirkkul merged 141 commits intomainfrom
dev/1.36
Feb 24, 2026
Merged

Dev/1.36#1935
dirkkul merged 141 commits intomainfrom
dev/1.36

Conversation

@tsmith023
Copy link
Collaborator

No description provided.

rlmanrique and others added 30 commits October 16, 2025 11:52
flat index: Add support for RQ and include cache param
#1765)

* Add client-side changes to handle new server-side batching in 1.33

* Update images in CI

* Update 1.33 image in CI

* Alter test or lazy shard loading for new potential server behaviour

* Change other lazy loading test too

* Fix CI image for 1.33

* Update protos, fix setting of batch client in wrapper to avoid races with connection

* Remove debug assert in test

* Update new batch to use different modes with server, update CI image

* Refactor to changed server batching options

* Throw error if using automatic batching with incompatible server

* Add exponential backoff retry to stream reconnect method

* Remove timeout and retries from new grpc methods

* Only delete key if present in dict

* Close before re-connecting, reset rec num objs on shutdown

* Update to use latest protos and behaviour

* Improve logging using .automatic()

* Update CI image to latest server build

* Fix testing issues with new versions

* Attempt fixes for tests again

* Add ability to retry certain server-emitted full errors, e.g. temporary replication problems

* Attempt fixes of flakes

* Update to use latest server impl and CI image

* Update to use latest dev server version

* Rename from automatic to experimental, bump CI version to latest RC

* Push ongoing changes

* Update to use latest server image

* Update to use latest server changes

* Undo debug changes to conftest

* Update to use latest server image

* Make internal send/recv queue size 1 and sleep while shutdown to avoid pushing to it

* Update to use latest server image

* Fix shutting down message handling

* Skip backoff handling if client has closed the stream

* Remove unused code

* Don't print backoff adjustments when shutting down

* Improve shutting down log

* Attempt to catch last req that can be lost during shutdown

* Avoid circular import

* Remove last_req wrapping logic from stream, reduce logging, update image in ci

* Close the client-side of the stream on shutdown, sleep for backoff during req generation

* Update CI image

* Only log waiting for stream re-establishment once

* Switch from arm to amd in CI

* Shutdown client-side stream regardless of size of __reqs queue

* Increase timeout when waiting for req to send, don't use queue size in if due to unreliability

* Use sentinel in req put/get to avoid inaccurate block timeouts

* Update CI image

* Correctly populate batch.results

* Update CI images

* Assert indexing status in one of the allowed values rather than a specific value

* Undo debug changes in tests

* Update to match new server impl

* Update to use latest server image

* Only start threads once to avoid runtime error when handling shutdown

* Update CI images

* Hard-code SSB concurrency to 1 for now

* Fix collection.batch.automatic

* Correct logic in `_BgThreads.is_alive`

* Adjust default batch size to align with server default and avoid overloading server too fast

* Update CI images and version checks in tests

* Update to use latest server behaviour around backoffs and uuid/err results

* Lock once when reading batch results from stream

* Interpret context canceled as ungraceful shutdown to be restarted by client

* Use backoff message to adjust batch size

* Start batching with smallest allowed server value

* Add extra log in batch send

* Reintroduce timeout when getting from queue

* Add log to empty queue

* Add log to batch recv restart

* Remove timeout when getting from internal queue

* Only update batch size if value has changed

* Track then log total number of objects pushed by client

* WIP: receive shutdown as message and not rpc error

* Move result writing inside message.results case

* Add missing proto changes

* Update CI image

* Improve resiliance on unexpected server behaviour

---------

Co-authored-by: Dirk Kulawiak <dirk@semi.technology>
…ient into introduce-experimental-batching-to-async-client
@dirkkul dirkkul marked this pull request as ready for review February 24, 2026 07:51
@dirkkul dirkkul requested a review from a team as a code owner February 24, 2026 07:51
Copilot AI review requested due to automatic review settings February 24, 2026 07:51
@dirkkul dirkkul merged commit 606b43c into main Feb 24, 2026
119 of 123 checks passed
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates the Python client to support Weaviate v1.36 features, notably server-side batch streaming (sync + async), plus new HFRESH vector index and async replication config options, along with corresponding proto/stub updates and test/workflow adjustments.

Changes:

  • Add server-side batch streaming implementations for sync and async clients/collections, including new wrappers/context managers and updated gRPC stream handling.
  • Add HFRESH vector index config + parsing, plus async replication configuration support and a new schema endpoint helper to delete property indexes.
  • Extend backup cancel API to support canceling restore operations; update CI/test matrices and regenerate protobuf artifacts.

Reviewed changes

Copilot reviewed 57 out of 57 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
weaviate/proto/v1/v6300/v1/generative_pb2.pyi Adds stop_sequences to AWS generative config stub.
weaviate/proto/v1/v6300/v1/batch_pb2.pyi Updates BatchStreamReply stub (acks/out_of_memory; removes shutdown).
weaviate/proto/v1/v6300/v1/batch_pb2.py Regenerated protobuf module reflecting updated BatchStreamReply schema.
weaviate/proto/v1/v5261/v1/generative_pb2.pyi Adds stop_sequences to AWS generative config stub.
weaviate/proto/v1/v5261/v1/batch_pb2.pyi Updates BatchStreamReply stub (acks/out_of_memory; removes shutdown).
weaviate/proto/v1/v5261/v1/batch_pb2.py Regenerated protobuf module reflecting updated BatchStreamReply schema.
weaviate/proto/v1/v4216/v1/generative_pb2.pyi Adds stop_sequences to AWS generative config stub.
weaviate/proto/v1/v4216/v1/batch_pb2.pyi Updates BatchStreamReply stub (acks/out_of_memory; removes shutdown).
weaviate/proto/v1/v4216/v1/batch_pb2.py Regenerated protobuf module reflecting updated BatchStreamReply schema.
weaviate/proto/v1/regen.sh Makes proto regen script accept a configurable Weaviate source directory.
weaviate/outputs/config.py Exposes VectorIndexConfigHFresh in outputs exports.
weaviate/exceptions.py Adds WeaviateBatchFailedToReestablishStreamError.
weaviate/connect/v4.py Adds streaming timeout support; adds async batch stream helpers; allows async connect(force).
weaviate/connect/base.py Adds host helpers (GCP/WCD detection) used by batch streaming.
weaviate/config.py Adds Timeout.stream for streaming operations.
weaviate/collections/data/sync.pyi Adds ingest() to sync collection data typing.
weaviate/collections/data/async_.pyi Adds ingest() to async collection data typing.
weaviate/collections/data/executor.py Implements ingest() using collection batch streaming context managers.
weaviate/collections/config/sync.pyi Adds HFRESH update typing and delete_property_index() typing.
weaviate/collections/config/async_.pyi Adds HFRESH update typing and delete_property_index() typing.
weaviate/collections/config/executor.py Implements delete_property_index() REST call and HFRESH typing support.
weaviate/collections/collection/sync.py Switches collection batch implementation to v1.36 stream-capable sync batch.
weaviate/collections/collection/async_.py Adds async collection batch wrapper namespace for streaming batching.
weaviate/collections/classes/config_vectors.py Adds HFRESH vector index create/update support for vectors config helpers.
weaviate/collections/classes/config_vector_index.py Adds VectorIndexType.HFRESH and HFRESH create/update models + factory.
weaviate/collections/classes/config_named_vectors.py Adds HFRESH update typing for named vectors updates.
weaviate/collections/classes/config_methods.py Adds schema parsing for HFRESH index config.
weaviate/collections/classes/config.py Adds HFRESH config dataclass, async replication config models, IndexName alias, factories.
weaviate/collections/batch/sync.py New sync server-side batch streaming implementation.
weaviate/collections/batch/async_.py New async server-side batch streaming implementation.
weaviate/collections/batch/grpc_batch.py Adds async stream entrypoint and updates stream docstrings/typing.
weaviate/collections/batch/collection.py Introduces sync/async batch collection types and stream() API; deprecates experimental().
weaviate/collections/batch/client.py Introduces sync/async batch client types and stream() API; deprecates experimental().
weaviate/collections/batch/batch_wrapper.py Adds async batch wrapper/context manager variants and async protocols.
weaviate/collections/batch/base.py Refactors batch base utilities; adds async node cluster helper; changes nodes status behavior.
weaviate/client.pyi Exposes batch on async client typing.
weaviate/client.py Instantiates async client batch wrapper.
weaviate/classes/config.py Re-exports IndexName.
weaviate/backup/sync.pyi Extends typing for backup cancel operation parameter.
weaviate/backup/async_.pyi Extends typing for backup cancel operation parameter.
weaviate/backup/executor.py Adds operation param to cancel backup or restore endpoints.
test/collection/test_config.py Adds unit tests for replication config create/update serialization.
profiling/test_sphere.py Updates profiling workload to use batch stream; adds async profiling variant.
profiling/test_shutdown.py Updates profiling shutdown test to use batch stream API.
profiling/conftest.py Adds async collection factory and enables async replication in profiling collections.
mock_tests/test_batch.py Adds mock test for cancelled stream recovery behavior.
mock_tests/conftest.py Updates mock Weaviate version to 1.36 and adds HEALTHY node status.
integration/test_rbac.py Updates RBAC batching test to stream() and v1.36 gate.
integration/test_collection_hfresh.py Adds integration tests covering HFRESH create/update/export/import.
integration/test_collection_config.py Adds integration test for deleting property indexes via new endpoint.
integration/test_collection_batch.py Adds async batch stream + ingest integration tests.
integration/test_batch_v4.py Adds async client batching tests and updates stream gating/version checks.
integration/test_backup_v4.py Splits cancel backup test and adds cancel restore test for v1.36+.
.github/workflows/main.yaml Adds Weaviate 1.36 RC to CI and updates test matrix to target it.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +297 to +302
if self.__is_shutting_down.is_set():
logger.info("Server shutting down, closing the client-side of the stream")
return
elif self.__is_oom.is_set():
logger.info("Server out-of-memory, closing the client-side of the stream")
return
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out_of_memory handling sets __is_oom, but the send generator then immediately closes the stream when the request queue is empty (elif self.__is_oom.is_set(): ... return). __is_oom is only cleared on a shutting_down message, and __recv() never reconnects on OOM, so the batch will not actually “wait for the server to scale-up” as the log suggests; it will either stall until __oom_wait_time and then raise, or just terminate when the stream closes. Consider either keeping the stream open during OOM and resuming when the server indicates recovery, or explicitly reconnecting/resetting __is_oom after the requested wait time and restarting __recv() (similar to the shutting_down/GCP renewal paths).

Copilot uses AI. Check for mistakes.
Comment on lines +346 to +357
except asyncio.TimeoutError:
if self.__is_shutting_down.is_set():
logger.info("Server shutting down, closing the client-side of the stream")
return
elif self.__is_oom.is_set():
logger.info("Server out-of-memory, closing the client-side of the stream")
return
elif self.__is_hungup.is_set():
logger.info("Detected hung up stream, closing the client-side of the stream")
return
logger.info("Timed out getting request from queue, but not stopping, continuing...")
logger.info("Batch send thread exiting due to exception...")
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async batch stream has the same OOM control-flow issue as the sync implementation: out_of_memory sets __is_oom, __send() exits early when __is_oom is set, and __recv() never reconnects/clears __is_oom on recovery (it’s only cleared on shutting_down). This contradicts the “wait for the server to scale-up” behavior and can lead to the batch stalling until timeout or terminating when the stream closes. Align the OOM path with the shutdown/hangup renewal logic (reconnect + clear flags + resume).

Copilot uses AI. Check for mistakes.
Comment on lines 914 to +917
try:
batch_object = BatchObject(
collection=collection,
properties=properties,
references=references,
uuid=uuid,
vector=vector,
tenant=tenant,
index=self.__objs_count,
)
self.__results_for_wrapper.imported_shards.add(
Shard(collection=collection, tenant=tenant)
)
except ValidationError as e:
raise WeaviateBatchValidationError(repr(e))
uuid = str(batch_object.uuid)
with self.__uuid_lookup_lock:
self.__uuid_lookup.add(uuid)
self.__batch_objects.add(self.__batch_grpc.grpc_object(batch_object._to_internal()))
with self.__objs_cache_lock:
self.__objs_cache[uuid] = batch_object
self.__objs_count += 1

# block if queue gets too long or weaviate is overloaded - reading files is faster them sending them so we do
# not need a long queue
while len(self.__batch_objects) >= self.__batch_size * 2:
self.__check_bg_threads_alive()
time.sleep(0.01)

assert batch_object.uuid is not None
return batch_object.uuid

def _add_reference(
self,
from_object_uuid: UUID,
from_object_collection: str,
from_property_name: str,
to: ReferenceInput,
tenant: Optional[str] = None,
) -> None:
self.__check_bg_threads_alive()
if isinstance(to, ReferenceToMulti):
to_strs: Union[List[str], List[UUID]] = to.uuids_str
elif isinstance(to, str) or isinstance(to, uuid_package.UUID):
to_strs = [to]
else:
to_strs = list(to)

for uid in to_strs:
try:
batch_reference = BatchReference(
from_object_collection=from_object_collection,
from_object_uuid=from_object_uuid,
from_property_name=from_property_name,
to_object_collection=(
to.target_collection if isinstance(to, ReferenceToMulti) else None
),
to_object_uuid=uid,
tenant=tenant,
index=self.__refs_count,
)
except ValidationError as e:
raise WeaviateBatchValidationError(repr(e))
self.__batch_references.add(
self.__batch_grpc.grpc_reference(batch_reference._to_internal())
)
with self.__refs_cache_lock:
self.__refs_cache[batch_reference._to_beacon()] = batch_reference
self.__refs_count += 1
response = executor.result(self._connection.get(path="/nodes"))
except Exception:
return []
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ClusterBatch.get_nodes_status() now swallows all exceptions and returns an empty list. Callers (e.g., batch stream reconnect logic) use this to wait for all nodes to be HEALTHY, so returning [] on transient network/API failures can turn hard failures into indefinite loops and makes troubleshooting much harder. Consider restoring the previous behavior of raising a connection/empty-response error (or at least logging and surfacing the failure) instead of silently returning [].

Copilot uses AI. Check for mistakes.
Comment on lines 45 to +50
python3 -m grpc_tools.protoc \
-I ../../../../weaviate/grpc/proto \
-I $WEAVIATE_DIR/grpc/proto \
--python_out="$output_dir" \
--pyi_out="$output_dir" \
--grpc_python_out="$output_dir" \
../../../../weaviate/grpc/proto/v1/*.proto
$WEAVIATE_DIR/grpc/proto/v1/*.proto
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WEAVIATE_DIR is expanded unquoted in the protoc invocation (-I $WEAVIATE_DIR/... and $WEAVIATE_DIR/.../*.proto). If the path contains spaces or glob characters, the script will break or behave unexpectedly. Quote these expansions (and consider using an array for the protoc args) to make the script robust.

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +300
def stream(
self,
*,
concurrency: Optional[int] = None,
consistency_level: Optional[ConsistencyLevel] = None,
) -> ClientBatchingContextManager:
"""Configure the batching context manager to use batch streaming.

When you exit the context manager, the final batch will be sent automatically.

Args:
concurrency: The number of concurrent streams to use when sending batches. If not provided, the default will be one.
consistency_level: The consistency level to be used when inserting data. If not provided, the default value is `None`.
"""
if self._connection._weaviate_version.is_lower_than(1, 36, 0):
raise WeaviateUnsupportedFeatureError(
"Server-side batching", str(self._connection._weaviate_version), "1.36.0"
)
self._batch_mode = _ServerSideBatching(
# concurrency=concurrency
# if concurrency is not None
# else len(self._cluster.get_nodes_status())
concurrency=1, # hard-code until client-side multi-threading is fixed
)
self._consistency_level = consistency_level
return self.__create_batch_and_reset(_BatchClientSync)
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream() accepts a concurrency argument but currently hard-codes concurrency=1 when constructing _ServerSideBatching, so the parameter is effectively ignored. This is an API footgun for users who expect concurrency to change behavior. Either implement concurrency, remove the parameter, or explicitly validate/reject values other than 1 and document that concurrency is not yet supported.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants