Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ You can install the respective extension by calling `localstack extensions insta
| [Miniflare](https://github.com/localstack/localstack-extensions/tree/main/miniflare) | localstack-extension-miniflare | 0.1.0 | Experimental |
| [Stripe](https://github.com/localstack/localstack-extensions/tree/main/stripe) | localstack-extension-stripe | 0.2.0 | Stable |
| [Terraform Init](https://github.com/localstack/localstack-extensions/tree/main/terraform-init) | localstack-extension-terraform-init | 0.2.0 | Experimental |
| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.2 | Experimental |
| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.3 | Experimental |


## Developing Extensions
Expand Down
63 changes: 32 additions & 31 deletions typedb/localstack_typedb/utils/h2_proxy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import socket
from enum import Enum
from typing import Iterable, Callable

from h2.frame_buffer import FrameBuffer
Expand Down Expand Up @@ -63,6 +64,11 @@ def apply_http2_patches_for_grpc_support(
)
patched_connection = True

class ForwardingState(Enum):
UNDECIDED = "undecided"
FORWARDING = "forwarding"
PASSTHROUGH = "passthrough"

class ForwardingBuffer:
"""
A buffer atop the HTTP2 client connection, that will hold
Expand All @@ -72,7 +78,7 @@ class ForwardingBuffer:

backend: TcpForwarder
buffer: list
proxying: bool | None
state: ForwardingState

def __init__(self, http_response_stream):
self.http_response_stream = http_response_stream
Expand All @@ -81,7 +87,7 @@ def __init__(self, http_response_stream):
)
self.backend = TcpForwarder(target_port, host=target_host)
self.buffer = []
self.proxying = None
self.state = ForwardingState.UNDECIDED
reactor.getThreadPool().callInThread(
self.backend.receive_loop, self.received_from_backend
)
Expand All @@ -91,35 +97,30 @@ def received_from_backend(self, data):
self.http_response_stream.write(data)

def received_from_http2_client(self, data, default_handler: Callable):
if self.proxying is False:
# Note: Return here only if `proxying` is `False` (a value of `None` indicates
# that the headers have not fully been received yet)
return default_handler(data)

if self.proxying:
assert not self.buffer
# Keep sending data to the backend for the lifetime of this connection
self.backend.send(data)
return

self.buffer.append(data)

if not (headers := get_headers_from_data_stream(self.buffer)):
# If no headers received yet, then return (method will be called again for next chunk of data)
return

self.proxying = should_proxy_request(headers)

buffered_data = b"".join(self.buffer)
self.buffer = []

if not self.proxying:
# if this is not a target request, then call the default handler
default_handler(buffered_data)
return

LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend")
self.backend.send(buffered_data)
match self.state:
case ForwardingState.PASSTHROUGH:
default_handler(data)
case ForwardingState.FORWARDING:
assert not self.buffer
# Keep sending data to the backend for the lifetime of this connection
self.backend.send(data)
case ForwardingState.UNDECIDED:
self.buffer.append(data)

if headers := get_headers_from_data_stream(self.buffer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nitpick: I guess we could reverse the logic with if not ... and return, to reduce the level of indentation. (but really not critical, more a matter of taste / personal preference, so feel free to leave as-is :) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it could go either way. I personally find it clearer like this, but if the code grew longer, I'd switch it.

buffered_data = b"".join(self.buffer)
self.buffer = []

if should_proxy_request(headers):
self.state = ForwardingState.FORWARDING
LOG.debug(
f"Forwarding {len(buffered_data)} bytes to backend"
)
self.backend.send(buffered_data)
else:
self.state = ForwardingState.PASSTHROUGH
# if this is not a target request, then call the default handler
default_handler(buffered_data)

def close(self):
self.backend.close()
Expand Down
4 changes: 2 additions & 2 deletions typedb/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ build-backend = "setuptools.build_meta"

[project]
name = "localstack-extension-typedb"
version = "0.1.2"
version = "0.1.3"
description = "LocalStack Extension: TypeDB on LocalStack"
readme = {file = "README.md", content-type = "text/markdown; charset=UTF-8"}
requires-python = ">=3.9"
requires-python = ">=3.10"
authors = [
{ name = "LocalStack + TypeDB team"}
]
Expand Down