Skip to content

Commit

Permalink
wip Hub support
Browse files Browse the repository at this point in the history
  • Loading branch information
guyluz11 committed Jan 13, 2024
1 parent 0b5af4b commit ca02e27
Show file tree
Hide file tree
Showing 13 changed files with 437 additions and 60 deletions.
24 changes: 21 additions & 3 deletions lib/domain/connections_service.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
import 'dart:async';
import 'dart:collection';
import 'dart:io';

import 'package:cbj_integrations_controller/integrations_controller.dart';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:cybearjinni/infrastructure/core/injection.dart';
import 'package:cybearjinni/infrastructure/core/logger.dart';
import 'package:cybearjinni/infrastructure/network_utilities_flutter.dart';
import 'package:dartz/dartz.dart';
import 'package:flutter/foundation.dart';
import 'package:grpc/grpc.dart';
import 'package:location/location.dart';
import 'package:network_info_plus/network_info_plus.dart';
import 'package:network_tools_flutter/network_tools_flutter.dart';
import 'package:permission_handler/permission_handler.dart'
as permission_handler;
import 'package:rxdart/rxdart.dart';

part 'package:cybearjinni/infrastructure/connection_service/app_connection_service.dart';
part 'package:cybearjinni/infrastructure/connection_service/hub_connection_service.dart';
part 'package:cybearjinni/infrastructure/connection_service/demo_connection_service.dart';
part 'package:cybearjinni/infrastructure/connection_service/none_connection_service.dart';

Expand All @@ -30,13 +44,16 @@ abstract interface class ConnectionsService {
if (connectionType == _currentConnectionType) {
return;
}
final ConnectionsService? oldInstance = _instance;
_instance?.dispose();

switch (connectionType) {
case ConnectionType.appAsHub:
_instance = _AppConnectionService();
_currentConnectionType = ConnectionType.appAsHub;
case ConnectionType.demo:
case ConnectionType.hub:
_currentConnectionType = ConnectionType.hub;
_instance = _HubConnectionService();
case ConnectionType.demo:
case ConnectionType.remotePipes:
_instance = _DemoConnectionService();
_currentConnectionType = ConnectionType.demo;
Expand All @@ -45,11 +62,12 @@ abstract interface class ConnectionsService {
_instance = _NoneConnectionService();
_currentConnectionType = ConnectionType.none;
}
oldInstance?.dispose();
}

static ConnectionType getCurrentConnectionType() => _currentConnectionType;

Future<bool> connect();

Future searchDevices();

void setEntityState(ActionObject action);
Expand Down
1 change: 1 addition & 0 deletions lib/domain/i_hub_connection_repository.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:cbj_integrations_controller/integrations_controller.dart';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:cybearjinni/domain/i_local_db_repository.dart';
import 'package:cybearjinni/infrastructure/core/injection.dart';
import 'package:cybearjinni/infrastructure/core/logger.dart';
import 'package:dartz/dartz.dart';
import 'package:flutter/foundation.dart' show kIsWeb;
import 'package:location/location.dart';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@ class _AppConnectionService implements ConnectionsService {
@override
Future<List<VendorEntityInformation>> getVendors() async =>
IcSynchronizer().getVendors();

@override
Future<bool> connect() async => true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ class _DemoConnectionService implements ConnectionsService {
@override
Future<List<VendorEntityInformation>> getVendors() async =>
IcSynchronizer().getVendors();

@override
Future<bool> connect() async => true;
}
287 changes: 287 additions & 0 deletions lib/infrastructure/connection_service/hub_connection_service.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
part of 'package:cybearjinni/domain/connections_service.dart';

class _HubConnectionService implements ConnectionsService {
_HubConnectionService() {
if (currentEnvApp == EnvApp.prod) {
hubPort = 50055;
} else {
hubPort = 60055;
}
}

/// Port to connect to the cbj hub, will change according to the current
/// running environment
late int hubPort;

String? hubIp;
String? networkBssid;
String? networkName;

ClientChannel? channel;
CbjHubClient? stub;

StreamController<MapEntry<String, DeviceEntityBase>>? entitiesStream;
StreamController<MapEntry<String, AreaEntity>>? areasStream;

BehaviorSubject<RequestsAndStatusFromHub> hubMessagesToApp =
BehaviorSubject<RequestsAndStatusFromHub>();

// StreamController<ClientStatusRequests> appMessagesToHub =
// StreamController<ClientStatusRequests>();

BehaviorSubject<ClientStatusRequests> appMessagesToHub = BehaviorSubject();

@override
Future dispose() async {
entitiesStream?.close();
}

@override
Future<HashMap<String, DeviceEntityBase>> get getAllEntities async {
appMessagesToHub.sink
.add(ClientStatusRequests(sendingType: SendingType.firstConnection));
await for (final RequestsAndStatusFromHub message
in hubMessagesToApp.stream) {
logger.i('message from hub is $message');
}
return HashMap();
}

@override
Future<HashMap<String, AreaEntity>> get getAllAreas async => HashMap();

@override
Future searchDevices() async {
try {
final Either<HubFailures, Unit> locationRequest =
await askLocationPermissionAndLocationOn();

if (locationRequest.isLeft()) {
return locationRequest;
}

logger.i('searchForHub');
String? appDeviceIp;
if (await Connectivity().checkConnectivity() == ConnectivityResult.wifi &&
!kIsWeb) {
final NetworkInfo networkInfo = NetworkInfo();
networkName = await networkInfo.getWifiName();
appDeviceIp = await networkInfo.getWifiIP();
networkBssid = await networkInfo.getWifiBSSID();
} else {
return false;
// if (deviceIpOnTheNetwork == null) {
// // Issue https://github.com/CyBear-Jinni/cbj_app/issues/256
// return left(
// const HubFailures
// .findingHubWhenConnectedToEthernetCableIsNotSupported(),
// );
// }

// currentDeviceIP = deviceIpOnTheNetwork;
// networkBSSID = 'no:Network:Bssid:Found';
// networkName = 'noNetworkNameFound';
// if (isThatTheIpOfTheHub != null && isThatTheIpOfTheHub) {
// return insertHubInfo(
// networkIp: currentDeviceIP,
// networkBSSID: networkBSSID,
// networkName: networkName,
// );
// }
}
if (appDeviceIp == null) {
return false;
}
final String subnet =
appDeviceIp.substring(0, appDeviceIp.lastIndexOf('.'));

logger.i('Hub Search subnet IP $subnet');

// TODO: Search for hub
final Stream<ActiveHost> devicesWithPort =
HostScanner.scanDevicesForSinglePort(
subnet,
hubPort,

/// TODO: return this settings when can use with the await for loop
// resultsInIpAscendingOrder: false,
timeout: const Duration(milliseconds: 600),
);

await for (final ActiveHost activeHost in devicesWithPort) {
logger.i('Found Cbj Hub device: ${activeHost.address}');
hubIp = activeHost.address;
return true;
// if (networkBSSID != null && networkName != null) {
// return insertHubInfo(
// networkIp: activeHost.address,
// networkBSSID: networkBSSID,
// networkName: networkName,
// );
// }
}
} catch (e) {
logger.w('Exception searchForHub\n$e');
}
}

@override
void setEntityState(ActionObject action) {}

@override
Stream<MapEntry<String, DeviceEntityBase>> watchEntities() {
entitiesStream?.close();

entitiesStream = StreamController.broadcast();
return entitiesStream!.stream;
}

@override
Stream<MapEntry<String, AreaEntity>> watchAreas() {
areasStream?.close();

areasStream = StreamController.broadcast();
return areasStream!.stream;
}

@override
Future setNewArea(AreaEntity area) async {}

@override
Future setEtitiesToArea(String areaId, HashSet entities) async {}

@override
Future addScene(SceneCbjEntity scene) async {}

@override
Future<HashMap<String, SceneCbjEntity>> getScenes() async => HashMap();

@override
Future activateScene(String id) async {}

@override
Future<void> loginVendor(VendorLoginEntity value) async {}

@override
Future<List<VendorEntityInformation>> getVendors() async =>
IcSynchronizer().getVendors();

@override
Future<bool> connect() async {
searchDevices();
if (hubIp == null) {
return false;
}

connectDirectlyToHub();

return true;
}

Future<Either<HubFailures, Unit>> askLocationPermissionAndLocationOn() async {
final Location location = Location();

bool serviceEnabled;
PermissionStatus permissionGranted;

int permissionCounter = 0;
int disabledCounter = 0;

// Get location permission is not supported on Linux
if (Platform.isLinux || Platform.isWindows) {
return right(unit);
}

while (true) {
permissionGranted = await location.hasPermission();
if (permissionGranted == PermissionStatus.denied) {
permissionGranted = await location.requestPermission();
if (permissionGranted != PermissionStatus.granted) {
logger.e('Permission to use location is denied');
await Future.delayed(const Duration(seconds: 10));

permissionCounter++;
if (permissionCounter > 5) {
permission_handler.openAppSettings();
} else if (permissionCounter > 7) {
return const Left(HubFailures.unexpected());
}
continue;
}
}

serviceEnabled = await location.serviceEnabled();
if (!serviceEnabled) {
serviceEnabled = await location.requestService();
if (!serviceEnabled) {
disabledCounter++;
if (disabledCounter > 2) {
return const Left(HubFailures.unexpected());
}
logger.w('Location is disabled');
await Future.delayed(const Duration(seconds: 5));
continue;
}
}
break;
}
return right(unit);
}

/// Connect directly to the Hub if possible
Future<void> connectDirectlyToHub() async {
if (hubIp == null) {
return;
}

// Socket? testHubConnection;
// try {
// testHubConnection = await Socket.connect(
// hubIP,
// hubPort,
// timeout: const Duration(milliseconds: 500),
// );
// await testHubConnection.close();
// testHubConnection.destroy();
// testHubConnection = null;
// } catch (e) {
// await testHubConnection?.close();
// testHubConnection?.destroy();
// }

try {
// // TODO: DELTE
// await HubClient.createStreamWithHub(
// hubIp!,
// hubPort,
// );

channel = ClientChannel(
hubIp!,
port: hubPort,
options:
const ChannelOptions(credentials: ChannelCredentials.insecure()),
);

channel!.onConnectionStateChanged.listen((event) {
logger.i('gRPC connection state $event');
});

stub = CbjHubClient(channel!);

final ResponseStream<RequestsAndStatusFromHub> response =
stub!.clientTransferEntities(
appMessagesToHub.stream,
);

// appMessagesToHub.sink
// .add(ClientStatusRequests(sendingType: SendingType.firstConnection));

hubMessagesToApp.addStream(response);
await Future.delayed(const Duration(seconds: 3));
} catch (e) {
logger.e('Caught error while stream with hub\n$e');
await channel?.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ class _NoneConnectionService implements ConnectionsService {

@override
Future<List<VendorEntityInformation>> getVendors() async => [];

@override
Future<bool> connect() async => true;
}
Loading

0 comments on commit ca02e27

Please sign in to comment.