十进制主义者的密码

0 点赞
编程农场
转载

Collection of my code. Includes: Parallelization --- Cactus: Shakersort --- Sunflower: Buckets --- Pumpkin: Queue --- Polyculture: random Chain or special-crop --- Maze: Multidrone Cheese Intro Steam guide for game version V1.0 updated as of 2025 Oct 19 This steam guide's thumbnail is completely unrelated. I just chose a funny square image that I had on my desktop. I love that meme. I might have missed some steam text formatting such as [i] in-code becoming italics. Use your best judgement! I'm enjoying the Oct 10 V1.0 update very much. I'm just uploading what I have currently. I don't really have any automation routines for you nor any meta-optimization algorithms for you. Just basic code. I follow github etiquette. My code is your code, and your code is my code, and we all steal are inspired off of each other. I'm just a mediocre coder. Comment any fixes or improvements for me to steal for readers to take note of. Unfortunately I'm also not a beginner to programming. I won't be able to help you if you don't already know some semblance of Pythonic or pseudocoding. Lastly, I'm guessing that <code> in this game is Pythonic custom DSL. A lot of syntactic sugar is missing, such as: using backslashes to indicate a continued line of code list comprehensions nonlocal keyword yield keyword and generator functions pointer passing (eg. **args) etc. I'm sure you'll find some missing too Notes TODO list for me - DONE **args / Parallelism for non Axis-Aligned (AA) subtasks/drone-jobs using nonzero-arg subtasks which can handle inputs defined by Factory functions - DONE Pumpkins parallelization - DONE Polyculture parallelization - Figure out how to detect the first-finished multiprocess drone job for the "long" parallelization strategy, instead of naively pulling the first-started drone job. This will improve speed for "long" strategy if the drone task does not have uniform job length / time taken. - "finding" code for other parts of the game Wish list to dev - using "== None" ticks my brain, years "is None" down the drain - Try/Exception - Using blackslash to indicate continue on newline of code - List comprehensions - yield keyword and generator functions - Immediate iterable decoupling for dicts (for k, v in dict:) - global keyword accepts multiple variables at once - OOP Additional Notes - The game uses wraparound movement. Treat the game board as an infinite tiling or a torus. - My coding style is to be "generally safe", which includes stuff like function parameter checking, etc. This introduces additional if statements and code for the drone to slog through, so if you really care about speeding up code by a couple ticks I recommend you make a copy and remove those checks. If you want to use code for leaderboard running please do this. - Whenever I use coordinates, I always use one argument (x, y) instead of two separate args x and y. It's a matter of personal preference I guess. (Also the function get_companion() returns the same type, a coordinate 2-tuple) - Disclaimer: For multiprocessing/parallelization code, I only ever coded those with worldsize and n_drones = 32. I haven't really tested the other cases such as multidrone not unlocked, or weird drone-worldsize combinations. Let us proceed fellow farmers. Helpers utils_num def is_even(num): return num % 2 == 0 def is_odd(num): return not is_even(num) def clamp(num, left, right): # Clamp a number between a given min and max. return min(max(num, left), right) def ring_clamp(num, width): return num % width utils_list I commonly use a 1d list to keep track of information on the game board. 2d matrix is too complicated for the scope so I just flatten it. You'll see me call this "serialized" version or something. def all(ls): for x in ls: if not x: return False return True def any(ls): for x in ls: if x: return True return False def index(ls, item): for i in range(len(ls)): if ls[i] == item: return i return None # TODO raise exception def max_with_index(ls): # Args: # ls: list # Returns: # value: maximum item # index: maximum index if not ls: return None, -1 max_val = ls[0] max_idx = 0 for i in range(1, len(ls)): cur_val = ls[i] if cur_val > max_val: max_val = cur_val max_idx = i return max_val, max_idx def extend(ls1, ls2): # Extend ls1 with ls2's values. for x in ls2: ls1.append(x) def reversed(ls): # Reverse the list. res = [] for i in range(len(ls) -1, -1, -1): res.append(ls[i]) return res def enumerate_list(ls): # Returns a list of (idx, val) pairs i = 0 result = [] for item in ls: result.append((i, item)) i = i + 1 return result def serialize_2d_index(idx, dim_len): # Args: # idx: 2-tuple # dim_len: int length of 0th dimension # Returns: # int: serialized index return idx[1] * dim_len + idx[0] def deserialize_2d_index(idx, dim_len): # Args: # idx: int # dim_len: int length of 0th dimension # Returns: # 2-tuple: 2d matrix index return idx % dim_len, idx // dim_len def serialize_2d_matrix(m): # Flattens a 2d matrix into a list. # Args: # m: List[List] # Returns: # list: flattened 1d matrix res = [] for i in range(len(m)): extend(res, m[i]) return res def deserialize_2d_matrix_CC(m, dim_len): # Rebuilds 2d matrix from 1d. # Uses the Contiguous Chunks strategy. # Args: # m: List # dim_len: int 0th dim target length (number of rows) # Returns: # list[list]: 2d matrix or None if invalid if dim_len <= 0: return None n = len(m) if n % dim_len != 0: return None # must divide evenly for strict reshape # build dim_len rows cols = n // dim_len res = [] i = 0 while i < dim_len: res.append([]) i = i + 1 # fill row-major (contiguous chunks) i = 0 while i < n: row_idx = i // cols res[row_idx].append(m[i]) i = i + 1 return res def deserialize_2d_matrix_unsafe_CC(m, dim_len): # Rebuilds a 2D matrix from a 1D list, # distributing items as evenly as possible. # Uses the Contiguous Chunks strategy. # Args: # m: list # dim_len: int 0th dim target length (number of rows) # Returns: # list[list]: 2D "best-effort" matrix distributed CC if dim_len <= 0: return [m] n = len(m) if n == 0: temp = [] for _ in range(dim_len): temp.append([]) return temp # Compute base size and remainder # (extra elements to distribute) rows = dim_len base = n // rows remainder = n % rows res = [] idx = 0 i = 0 while i < rows: # Give one extra element to # the first `remainder` rows if i < remainder: extra = 1 else: extra = 0 row_len = base + extra row = m[idx:idx + row_len] res.append(row) idx += row_len i += 1 # If some elements remain (shouldn't # happen), append them all to # the last row if idx < n: res[-1].extend(m[index:]) return res def deserialize_2d_matrix_RR(m, dim_len): # Rebuilds 2d matrix from 1d. # Uses the Round Robin strategy. # Args: # m: List # dim_len: int 0th dim target length (number of rows) # Returns: # list[list]: 2d matrix or None if invalid if dim_len <= 0: return None # NOTE update when list comps res = [] i = 0 while i < dim_len: res.append([]) i = i + 1 n = len(m) if n == 0: return res if n % dim_len != 0: return None # must divide evenly for strict reshape # build exactly dim_len rows res = [] i = 0 while i < dim_len: res.append([]) i = i + 1 # round robin placement for idx, item in enumerate_list(m): res[idx % dim_len].append(item) return res def deserialize_2d_matrix_unsafe_RR(m, dim_len): # Rebuilds a 2D matrix from a 1D list, # distributing items as evenly as possible. # Uses the Round Robin strategy. # # Args: # m: list # dim_len: int 0th dim target length (number of rows) # Returns: # list[list]: 2D "best-effort" matrix distributed RR if dim_len <= 0: return [m] n = len(m) # NOTE update when list comps [[] for _ in range(dim_len)] res = [] for _ in range(dim_len): res.append([]) if n == 0: return res # round robin placement for idx, item in enumerate_list(m): res[idx % dim_len].append(item) return res utils_move For some reason, you can't import "hidden" functions (names starting with underscore) like with Python. Thus my "_goto_y()" will just be "goto_y()". from utils_num import * from utils_list import * from utils_drone import * def get_pos(): return (get_pos_x(), get_pos_y()) def _shortest_delta(curr, dest, size): d = (dest - curr) % size if d > size / 2: d -= size return d def goto_x(tx): ws = get_world_size() cx = get_pos_x() dx = _shortest_delta(cx, tx, ws) if dx > 0: for _ in range(dx): move(East) elif dx < 0: for _ in range(-dx): move(West) return True def goto_y(ty): ws = get_world_size() cy = get_pos_y() dy = _shortest_delta(cy, ty, ws) if dy > 0: for _ in range(dy): move(North) elif dy < 0: for _ in range(-dy): move(South) return True def goto(target): # The drone makes use of wraparound. # Args: # target: Tuple of length 2 # Returns: # bool: Success state tx, ty = target goto_x(tx) goto_y(ty) return True def u_turn(dir): # Given a direction, return the # opposite direction if dir == North: return South elif dir == South: return North elif dir == East: return West elif dir == West: return East else: return None def calculate_p_AA_positions(axis): # Calculates a list of all positions # across an axis, ordered farthest # distance first, inbetweens # alternating direction, and # closest distance last. # For the use of reaching the # farthest point first when starting # an axis aligned drone task. # Args: # axis: "x" or "y" # Returns: # list[int] recompute_ws() positions = [] if axis == "x": temp_pos = get_pos_x() else: temp_pos = get_pos_y() farthest = temp_pos - (ws // 2) if farthest < 0: farthest += ws for i in range(1, ws+1): if is_odd(i): d = i // 2 else: d = - i // 2 positions.append(ring_clamp(farthest + d, ws)) return positions utils_item def check_has_item(item, num=1): # DEPRECATED use can_pay_for() # Args: # item: obj under class Items # num: number to check if num_items(item) < num: return False return True def can_pay_for(entity, num=1): # Checks if I can pay for the # desired item(s). costs = get_cost(entity) if not costs: return True for k in costs: v = costs[k] if not check_has_item(k, v * num): return False return True Helpers Pt 2 utils_grow from utils_item import * from utils_num import * # Used for logic in polyculture entity_growth_timing = {# Entity: tuple[float, float], in seconds Entities.Apple: (0.2, 0.2), Entities.Bush: (3.2, 4.8), Entities.Cactus: (1.0, 1.0), Entities.Carrot: (4.8, 7.2), Entities.Dead_Pumpkin: (0, 0),# JIC Entities.Dinosaur: (0.18, 0.22), Entities.Grass: (0.5, 0.5), Entities.Hedge: (0, 0),# JIC Entities.Pumpkin: (0.2, 3.8), Entities.Sunflower: (5.6, 8.4), Entities.Treasure: (0, 0),# JIC Entities.Tree: (5.6, 8.4), } entity_allowed_groundtype = {# Entity: tuple containing allowed ground types Entities.Apple: (Grounds.Grassland, Grounds.Soil), Entities.Bush: (Grounds.Grassland, Grounds.Soil), Entities.Cactus: (Grounds.Soil), Entities.Carrot: (Grounds.Soil), Entities.Dead_Pumpkin: (Grounds.Soil),# assign as if pumpkin Entities.Dinosaur: (Grounds.Grassland, Grounds.Soil), Entities.Grass: (Grounds.Grassland, Grounds.Soil), Entities.Hedge: (),# JIC. Entities.Pumpkin: (Grounds.Soil), Entities.Sunflower: (Grounds.Soil), Entities.Treasure: (),# JIC. Entities.Tree: (Grounds.Grassland, Grounds.Soil), } entity_needs_soil = { # Exclusively needs soil. Entities.Cactus, Entities.Carrot, Entities.Pumpkin, Entities.Sunflower, } entity_needs_grassland = set()# Exclusively needs grassland. def calculate_actual_growth_time(): # Account for water level: linear interpolation # which is z = val * (left*(1-y) + right*(y)) ent = get_entity_type() if ent == None or ent not in entity_growth_timing: return 0 return entity_growth_timing[ent][0] * (1 / (1 + 4 * get_water())) def maintain_water(thresh): # Args: # thresh: float threshold to maintain above # Returns: # bool: Success state if not check_has_item(Items.Water): return False thresh = clamp(thresh, 0, 1) while get_water() < thresh: use_item(Items.Water) return True def till_for_entity(ent): # Prepare the ground for planting. # Simpler version but less safe: # if get_ground_type() not in entity_allowed_groundtype[ent]: # till() # ground = get_ground_type() if ent in entity_needs_soil and ground != Grounds.Soil: till() elif ent in entity_needs_grassland and ground != Grounds.Grassland: till() return True utils_farm from utils_drone import * from utils_move import * def _factory_till_row(args_tuple): start_pos = args_tuple def _subtask_till_row(): recompute_ws() goto(start_pos) for _ in range(ws): if get_ground_type() != Grounds.Soil: till() move(East) return True return _subtask_till_row def _subtask_harvest_eastwards(): recompute_ws() for _ in range(ws): harvest() move(East) return True def till_all(): recompute_parallelism() px = get_pos_x() args = [] positions = calculate_p_AA_positions("y") for i in range(ws): args.append((px, positions[i])) return drone_task(_factory_till_row, args) def harvest_all(): # Use in case you planted an expensive field of crops. recompute_parallelism() return drone_naive_AA_task(_subtask_harvest_eastwards, North) def randcoords(): recompute_ws() return ((random() * ws) // 1, (random() * ws) // 1) def get_companion_saferand(): # Returns a random coordinate if # get_companion() fails. ret = (get_companion(),) if ret[0] == None: return None, randcoords() else: return ret[0][0], ret[0][1] utils_tick def naive_wait(ticks): # Wait for ticks ticks. # This function is unsafe. # This function has overhead (adjusted). # 1: Evaluate ticks - 3 # 1: Start for loop # 1: IDK for _ in range(ticks - 3): pass utils_tick def naive_wait(ticks): # Wait for ticks ticks. # This function is unsafe. # This function has overhead (adjusted). # 1: Evaluate ticks - 3 # 1: Start for loop # 1: IDK for _ in range(ticks - 3): pass Parallelization If you don't have any experience with parallelization specifically multiprocessing as a means to achieve parallelization, then this part will be quite hard. Thankfully for me, I'm in uni and have taken an OS course which covered scheduling, multiprocessing, related topics etc so at least I'm not hopeless. (What the hell is a semaphore haha) I wrote most of my parallelization functions in hopes that they can be used independently of Megafarm unlock level. I'm not sure if it works though, I haven't tried to simulate it. (eg. Hopefully, prevents using the function max_drones() if megafarm hasn't been unlocked, as that will raise errors). Disclaimer, Caution: I've only tried all my parallelization code on the "short" mode with worldsize = max_drones() = 32. I finally have a solution for **args-type subtasks!!! I sincerely hope that the drone_task function can help, I spent a lot of time on that. The way I deal with it is to treat **args as a tuple object containing m arguments to pass in. Then any subtask should be defined as taking in one tuple parameter, and can access parameters through iterating the tuple. Doesn't this remind you of how to use the Stack when programming in Assembly? :D Also, this is the one time I've unironically used the factory design pattern. Thank you to this game for making me write "clean" code. Examples for how to make factory functions can be found in my implementations for polyculture, sunflowers, etc. utils_drone # A solution to **args is to design the subtask to accept a tuple parameter. def max_drones_safe(): # Requires having unlocked Auto Unlock. if num_unlocked(Unlocks.Megafarm) < 1: return 1 else: return max_drones() # Drone parallelism def choose_parallelism(ws): # Recompute and decide how the main drone participates in parallel work. # # My definition of parallelism assumes the work that needs # parallelism needs so under extension on one dimension # of the game board. # # Heuristic: # - If Megafarm is locked: no parallelism ("none"). # - Prefer "short" (main drone is a worker) when: # - The field width divides evenly among all drones, or # - The total number of drones is small relative to worldsize. # - However this assumes subtasks complete in roughly uniform # time, and that the n_drones*200ticks time overhead nearing # all subtasks' completion is acceptable. It could cause idle # drones or imbalance if subtask duration is random. # - Prefer "long" (main drone only delegates tasks) when: # - Removing the main drone yields an even partition, or # - The division is cleaner (greater GCD) without the main. # - There is a job requirement to proactively respawn drones # or subtask length follows a non-uniform distrubution. # - But I haven't found a solution to computing the first # job-completed drone task to efficiently manage drones # before they become idle. (TODO) # This balances even workload distribution with minimal idle time. # Megafarm not unlocked → no parallelism if get_cost(Unlocks.Megafarm) == get_cost(Unlocks.Megafarm, 0): return "none" n = max_drones() # Safety. If only main drone exists: it must work if n <= 1: return "short" # If we have more drones than dimension, either mode will work. # "long" somehow seems safer. I can't explain. if n > ws: return "long" # If workforce is small, subtask-ending-overhead shouldn't be large if n <= 4: return "short" # Prefer an even partition using all workers (main included) if ws % n == 0: return "short" # If not even, see if excluding the main gives an even split if ws % (n - 1) == 0: return "long" # If workforce is small vs width, every hand helps if (n - 1) < ws // 4: return "short" # Fallback: pick the mode that yields fewer ragged strips (GCD-based) def _gcd(a, b): while b: a, b = b, a % b return a g_all = _gcd(ws, n) g_nomin = _gcd(ws, n - 1) # Larger GCD → cleaner tiling. If tie, prefer delegator for less contention. if g_all >= g_nomin: return "short" else: return "long" def recompute_ws(): global ws ws = get_world_size() def recompute_parallelism(new_type=None): # Corner case against runtime upgrades of worldsize or n_drones. # Call at the start of big entry points like till_all() or maintask(). # Args: # new_type: manual application of one of "none", "short", "long" recompute_ws() global parallelism_type if new_type == None: parallelism_type = choose_parallelism(ws) else: parallelism_type = new_type def drone_task(factory, args, local_p_type=None): # This function is for usage with a "N independent jobs" pattern. # Convention: # Ensure subtasks are "closed" w.r.t. starting position. This is # in case of the "short" mode needing main drone to be a worker. # If not, ensure so externally around your function call. # Args: # factory: factory function taking m parameters which # can cleanly create zero-arg subtasks. # args: list of m-tuple of arguments to pass into factory. # (implied) length: number of jobs. Inferred as n=len(args) # local_p_type: local scope parallelism_type. # Useful for second-order delegators. # Returns: # results: list of n results length = len(args) if length == 0: # Trivial return [] if local_p_type != None: p_type_to_use = local_p_type else: # global keyword not needed for readonly p_type_to_use = parallelism_type _drones = [] for _ in range(length): _drones.append(None) results = [] for _ in range(length): results.append(None) if p_type_to_use == "none": # Does not account for drone parallelism. for i in range(length): res = factory(args[i])() results[i] = res return results elif p_type_to_use == "short": # Uses the main drone as a worker. for i in range(length): subtask_watcher = spawn_drone(factory(args[i])) if subtask_watcher: _drones[i] = subtask_watcher else: res = factory(args[i])() results[i] = res for i in range(len(_drones)): if _drones[i] != None: results[i] = wait_for(_drones[i]) return results elif p_type_to_use == "long": # Uses the main drone as a task delegator. _drones_idx = []# Keeps track of index w.r.t. results list. counter, limit = 0, length while counter < limit: if len(_drones) < max_drones(): subtask_watcher = spawn_drone(factory(args[counter])) if subtask_watcher: _drones.insert(0, subtask_watcher) _drones_idx.insert(0, counter) counter += 1 else: # n_drones under capacity but spawn_drone() failed. quick_print("Error: unexpected spawn_drone() failure") return None if len(_drones) >= max_drones(): # TODO corner case for non-uniform subtask timing # Detect the first-job-exited drone instead of # the first-job-started drone idx = _drones_idx.pop() res = wait_for(_drones.pop()) results[idx] = res while _drones: idx = _drones_idx.pop() res = wait_for(_drones.pop()) results[idx] = res return results else: quick_print("Error: Unknown parallelism type:", p_type_to_use) return None # Insert Axis Aligned Drone Task code here because the code section was too long to put into 1 steamguide section if __name__ != "__main__": ws = get_world_size() parallelism_type = choose_parallelism(ws) quick_print("parallelism_type:", parallelism_type) Parallelization Pt 2 This code for parallelization is pretty old. You should probably use the generalized drone_task instead. def drone_naive_AA_task(subtask, dir, length=None): # OLD CODE use drone_task() instead # Starts a naive Axis-Aligned subtask along an axis. # Make sure the drone is in the right place before starting. # Movement assumes drone can wraparound the game board. # Convention: # TODO while I figure out **args, subtasks cannot take inputs. # Ensure subtasks are "closed" w.r.t. starting position. This is # in case of the "short" mode needing main drone to be a worker. # If not, ensure so externally around your function call. # Subtasks returning bool are success states. False = Failure # TODO take care of subtasks returning results. I havent figured # out what to do if a subtask ran successfully but returns 0 results, # as that is a Falsy evaluation. # Example: # Shakersorting along one dimension on the Cactus puzzle # Args: # dir: Cardinal direction # mode: one of "none", "short" or "long" # subtask: name of function to perform # Returns: # bool: success state # Is the reason for naive. if length == 0:# Trivial return True elif length == None: # "== None" looks so wrong. But "is None" doesn't exist in # the in-game Pythonic DSL. # Putting length=ws as default param binds it at import time. # Recalculate here to take care of runtime ws-upgrade increases. length = ws _drones = [] if parallelism_type == "none": # Does not account for drone parallelism. for _ in range(length): if not subtask(): return False move(dir) return True elif parallelism_type == "short": # Uses the main drone as a worker. for _ in range(length): subtask_watcher = spawn_drone(subtask) if subtask_watcher: _drones.append(subtask_watcher) else: if not subtask(): return False move(dir) while _drones: if not wait_for(_drones.pop()): return False return True elif parallelism_type == "long": # Uses the main drone as a task delegator. counter, limit = 0, length while counter < limit: if len(_drones) < max_drones(): subtask_watcher = spawn_drone(subtask) if subtask_watcher: _drones.insert(0, subtask_watcher) counter += 1 move(dir) else: # n_drones under capacity but spawn_drone() failed. quick_print("Error: unexpected spawn_drone() failure") return False if len(_drones) >= max_drones(): if not wait_for(_drones.pop()):# TODO detect the first # exited drone instead of first-job-started drone return False while _drones: if not wait_for(_drones.pop()): return False return True else: print("Error: Unknown parallelism type:", parallelism_type) return False Basic Crops Left as an exercise for the reader :) Important: Always prioritize polyculture. Only use naive strategies like this if you need an amount of a certain crop and you haven't unlocked PC. Polyculture is exclusively an output multiplier and it does not apply a proportionate cost penalty. It drastically increases the ratio of harvest reward to entity-planting cost! Time efficiency is increased due to the above. The amount of non-polyculture entities you have to harvest to get the same output rates as a strategy that makes use of polyculture, scales proportionally to the inverse of the current polyculture multiplier. I.e., for the same output rate, a polyculture-enabled strategy needs 1/(PC mult) the time. Grass Naive naive double for loop Bushes Naive naive double for loop Carrots Naive naive double for loop Parallelized p_naive_carrots from utils_list import * from utils_drone import * from utils_grow import * from utils_farm import * from utils_move import * from utils_item import * WATER_THRESH = 0.75 def stop_condition(): # Specify your stopping condition here. return num_items(Items.Carrot) > 10000000000 def _factory_carrots_row(args_tuple): # args_tuple: (row_index,) row_index = args_tuple[0] def job(): start = get_pos() while not stop_condition(): goto((0, row_index)) for _ in range(ws): if can_harvest(): harvest() plant(Entities.Carrot) maintain_water(WATER_THRESH) move(East) goto(start) return True return job def maintask(): recompute_parallelism() args = [] i = 0 while i < ws: args.append((i,)) i += 1 drone_task(_factory_carrots_row, args) return True if __name__ == "__main__": clear() till_all() maintask() Trees Naive clear() ws = get_world_size() for i in range(ws): for j in range(ws): if (i % 2 == 1 and j % 2 == 0) or (i % 2 == 0 and j % 2 == 1): plant(Entities.Tree) move(East) move(North) while True: for i in range(ws): for j in range(ws): curr_ent = get_entity_type() if curr_ent == Entities.Tree and can_harvest(): harvest() plant(Entities.Tree) move(East) move(North) Parallelized Left as an exercise to the reader. General Polyculture (Parallelization) I just want to say that I'm thankful for school for teaching me clean code and what a Factory design pattern even is, because the more I use it for updating my old parallelization code, the more thankful I get. Notes As far as I know, "polyculture" is a yield multiplier for an existing, planted crop ("target"), that only activates if another specific crop (requested upon planting the "target", of which, is called the "companion") exists at a specific location, no matter the growth status nor infected status of the companion. It just has to exist and the multiplier will activate for the target. The request range will be limited (in-game docs say the range is 3). In addition, it does not apply a proportionate planting-cost penalty. Additional (future) implementation idea Drones work in pairs. One plants the start of a chain of companions, and one harvests the end of that chain of companions. I'm unsure of how to proceed with this though, because I don't want to deal with all the race conditions I imagine must be plaguing the scenario. As always, I leave it as an exercise to the reader. Naive / chain-random Here is my solution for polyculture. Thanks to >ANY KEY_'s guide for the chain-random idea. The code works on the principle that we plant chains of companions. At some random point in the future, the head of a chain will randomly happen to intersect a middle-link of the chain. We will harvest the crop in this chain intersection that already has its companion planted and replace it with the head of the new chain. When a drone is continuing the chain, it will always harvest() before replacing the tile with the next required companion to continue the chain. This gives us amortized results, where the algorithm requires some startup time to ramp up results / populate the game board with the chains. I realize it's not the most efficient but hey at least it works on my machine. In addition, it is just hopium that plants grow faster then the time it takes for the next drone to randomly come to the tile and harvest it. That's why I used water. The rates are in all honesty inefficient for the potential of parallelization. I'm still looking for improvements to steal. If you want a more efficient script that will farm all polyculture-enabled crops at a time, please modify the "onecrop" strategy in the below section to plant different crops based on a modulus rule on what column/row a onecrop tile is in. Farming results follow the distribution of potential companions. Which I assume is uniform. get_companion() is not None-safe, so I made my own version and put it into utils_farm. Please check that section for updated code. TODO there will still be some warnings for failure to plant on a different tile type. This is due to a race condition I haven't figured out how to solve, but in the meantime the code will just skip hte tile and continue so it should work okay. p_pc # parallel polyculture. from utils_move import * from utils_farm import * from utils_grow import * from utils_drone import * # Hyperparameter chance for drone not to follow its companion. # Should prevent drones bunching up. # TODO Tune this later # Tuning: 0.10: some warnings for planting nontill-plants on # tilled tiles and conversely for till-plants # Around 0.20 is fine. Still some warnings in the long run though. HYP_RANDMOVE_CHANCE = 0.20 WATER_THRESH = 0.75 def stop_condition(): # Write your desired target # condition to stop the run at. # True if condition fulfilled. # I have it set currently for Top Hat unlock. # below is just to shorten line width. _a = num_items(Items.Hay) > 1000000000 _b = num_items(Items.Wood) > 10000000000 _c = num_items(Items.Carrot) > 1000000000 return all([_a, _b, _c]) def _subtask_pc(): while True: if stop_condition(): return True if random() < HYP_RANDMOVE_CHANCE: goto(((random() * ws) // 1, (random() * ws) // 1)) continue companion, (x, y) = get_companion_saferand() goto((x, y)) if companion == None: continue if can_harvest(): harvest() till_for_entity(companion) plant(companion) maintain_water(WATER_THRESH) def _factory_subtask_pc(args_tuple): start_pos = ((random() * ws) // 1, (random() * ws) // 1) def _subtask_randstart_pc(): goto(start_pos) return _subtask_pc() return _subtask_randstart_pc def maintask(): recompute_parallelism() if parallelism_type == "none": _subtask_pc() elif parallelism_type in ("short", "long"): # The subtask is not finite. Therefore "long" # has no use. Use "short" drone strategy. recompute_parallelism("short") args = [] for _ in range(max_drones()): args.append(()) drone_task(_factory_subtask_pc, args) else: quick_print("Error: Unknown parallelism type:", parallelism_type) return None if __name__ == "__main__": clear() # We start at (0, 0) with a wheat entity. # After planting something, we plant its companion. # Rely on randomness to come back to the original plant # meaning there will be some start-up time required. maintask() Specialized Polyculture (Parallelization) I made this code that focused on farming one crop using polyculture. This is specifically for getting a large amount of a crop in a short timeframe. As with the explanation under the Basic Crops section, always prioritize using this strategy over non-polyculture strategies (such as filling the entire field with carrots). This is in two parts due to the (1) Cost ratio efficiency, and (2) Time-to-harvest efficiency. I can't really be bothered to explain it all because I've already left a bunch of code comments in there. Please read them over for explanation. If you choose to use fertilizer on an expensive crop w.r.t. growth time, it might use up a ton of it. The Tree is the slowest by far, at a growth time between (4.8, 7.2) seconds. So stock up before trying for achievements. If you want to use fertilizer, I highly suggest you use water at the same time, with the "both" speedup strategy. There's also no cost checking because I assume you're already well-established in terms of what you have in stock. TODO that might be for improvement in case anybody wants to use the code in their automation routines or meta-optimization algorithms. Fix: fert usage didnt account for water level. Now improved fert usage Moved some logic away into utils_grow p_pc_onecrop # Optimizes the output of a specifc crop # exclusively, using polyculture. # One of the use cases of fertilizer since # the surrounding crops don't matter. # All we need is for them to exist to be # counted as a companion plant. # For Weird Substance farming, please enable # the relevant flag and farm grass. from utils_list import * from utils_drone import * from utils_grow import * from utils_farm import * from utils_move import * from utils_item import * ONECROP_TILE_SPACING = 5 # Caution: uses a lot of fertilizer. Always prioritize # the "both" strategy over solely "fert". # With 32 drones, farming trees, the "both" strategy # costs approximately 100fert/sec, or 6k/min. # Keep in mind to stock up before trying for achievements. SPEEDUP_STRATEGY = "both" # One of "none", "water", "fert", "both" WATER_THRESH_ONECROP = 0.95 FLAG_FARM_WEIRD_SUBSTANCE = False TARGET_CROP = Entities.Carrot# Don't forget inside def stop_condition():# this function!!! # Specify your stopping condition here. return num_items(Items.Carrot) > 10000000000 def get_onecrop_indices(): # Infer if onecrop's pc tiling range (3) in # a direct overlap with another onecrop is # acceptable based on user's selected SPACING. # This function computes overlap acceptability # in a relaxed manner; # (min of PC spacing and selected spacing.) recompute_ws() overlap_acceptable = ONECROP_TILE_SPACING <= 3 step = ONECROP_TILE_SPACING indices = [] y = 0 while y < ws: x = 0 while x < ws: indices.append(y * ws + x) x += step y += step if not overlap_acceptable: good_indices = []# Memory in this game is pretty much free for i in range(len(indices)): temp = indices[i] # Consider the 0th col always occupied. if not temp % ws > ws - min(3 + 1, ONECROP_TILE_SPACING): good_indices.append(temp) indices = good_indices return indices def _factory_goto_and_plant(args_tuple): companion = args_tuple[0] pos = args_tuple[1] def _subtask_goto_and_plant(): goto(pos) harvest()# destroy current plant till_for_entity(companion) if not plant(companion): pass # Don't return False here. # Just keep going and fail silently return True return _subtask_goto_and_plant def _factory_onecrop(args_tuple): pos = args_tuple[0] local_p_type = args_tuple[1] def _subtask_onecrop(): flag_has_used_fert_this_round = False goto(pos) # This is a temporary measure while I figure out how # to delay high-level drones from spawning more drones # while other high level drones are being spawned by # the main drone. do_a_flip() do_a_flip() till_for_entity(TARGET_CROP) plant(TARGET_CROP) while not stop_condition(): if can_harvest(): if FLAG_FARM_WEIRD_SUBSTANCE: use_item(Items.Weird_Substance) harvest() flag_has_used_fert_this_round = False # Use this if any onecrop's pc tiling # range (3) overlaps with another onecrop # directly, where obstruction can occur if # another onecrop's companion needs to be on # the current onecrop's tile. if ONECROP_TILE_SPACING <= 3: till_for_entity(TARGET_CROP) plant(TARGET_CROP) companion, com_pos = get_companion_saferand() args = [(companion, com_pos)]# length 1 prev_pos = get_pos() drone_task(_factory_goto_and_plant, args, local_p_type) goto(prev_pos) else: # Always water before using fert. Water decreases # growth time multiplicatively while fert is additive. if SPEEDUP_STRATEGY in ("both", "water"): maintain_water(WATER_THRESH_ONECROP) if SPEEDUP_STRATEGY in ("both", "fert"): if not flag_has_used_fert_this_round: # Fertilizer decreases growth time by 2 seconds. actual_timing = calculate_actual_growth_time() num_ferts_to_use = actual_timing // 2 + 1 for _ in range(num_ferts_to_use): use_item(Items.Fertilizer) flag_has_used_fert_this_round = True if flag_has_used_fert_this_round: use_item(Items.Weird_Substance) if SPEEDUP_STRATEGY in ("none"): do_a_flip()# Conserve energy. return True return _subtask_onecrop def main(): # In this problem there are two locations # of parallelization. 1: initial drone # to each onecrop tile, and 2: further # delegation to save time during # companion planting clear() strategy = None tile_1d_indices = get_onecrop_indices() temp = len(tile_1d_indices) # TODO consider recompute_parallelism # manually since subtasks are not defined # w.r.t. worldsize? if parallelism_type == "none": local_p_type = "none" else: recompute_parallelism() local_p_type = "short" args = [] for i in range(max_drones()): args.append((deserialize_2d_index(tile_1d_indices[i], ws), local_p_type)) drone_task(_factory_onecrop, args) if __name__ == "__main__": main() Weird Substance (Old) Here's some code to quickly get some Weird Substance on a wheat field. I don't bother with a larger scope for this because as soon as you get good multipliers on outputs, you can apply WS and get half of that high output. There's not much required to unlock mazes (on the order of only a couple hundred thousand). If you require more, please see the specialized polyculture code, which has a function to farm Weird Substance as well. def should_apply_weird_s(pos): # Args: # pos: 2-tuple curr pos (x, y) # Returns: # bool: Truth value cx, cy = pos return ((2 * cx + cy) % 5) == 0 if __name__ == "__main__": clear() ws = get_world_size() for i in range(ws): for j in range(ws): if should_apply_weird_s((get_pos_x(), get_pos_y())): use_item(Items.Weird_Substance) move(East) move(North) Pumpkin (Parallelization) Thanks to Wrath for the following code becuase I got too lazy to code this. It makes parallel passes over the field and replants over empty tiles and dead pumpkins, until all pumpkins are grown, however might have some minute overhead in between harvesting and replanting. # Wrath's Pumpkin Parallelization code # PUMPKIN PARLLL from utils_list import * from utils_move import * from utils_item import * from utils_drone import * def build_row_regions(dim, workers): # Returns a list of region tile-index lists (row stripes). if workers <= 0: workers = 1 if workers > dim: workers = dim base = dim // workers rem = dim % workers regions = [] i = 0 while i < workers: if i < rem: rows_for_i = base + 1 else: rows_for_i = base start_row = i * base if i < rem: start_row += i else: start_row += rem end_row = start_row + rows_for_i tiles = [] r = start_row while r < end_row: c = 0 while c < dim: tiles.append(r * dim + c) c += 1 r += 1 regions.append(tiles) i += 1 return regions def _factory_prepare_and_plant_region(args_tuple): # args_tuple: (positions_list,) region_positions = args_tuple[0] def job(): i = 0 while i < len(region_positions): idx = region_positions[i] x, y = deserialize_2d_index(idx, ws) goto((x, y)) if get_ground_type() != Grounds.Soil: till() if not can_pay_for(Entities.Pumpkin): return False plant(Entities.Pumpkin) i += 1 return True return job def _factory_check_region_ready(args_tuple): # args_tuple: (positions_list,) region_positions = args_tuple[0] def job(): # Return True only if every tile in region is harvestable pumpkin. i = 0 all_ready = True while i < len(region_positions): idx = region_positions[i] x, y = deserialize_2d_index(idx, ws) goto((x, y)) ent = get_entity_type() if ent == Entities.Dead_Pumpkin: # Immediate fix; forces barrier to wait longer if not can_pay_for(Entities.Pumpkin): return False if get_ground_type() != Grounds.Soil: till() plant(Entities.Pumpkin) all_ready = False else: # Must be a pumpkin and must be harvestable if ent != Entities.Pumpkin or not can_harvest(): all_ready = False i += 1 return all_ready return job def _factory_harvest_region(args_tuple): # args_tuple: (positions_list,) region_positions = args_tuple[0] def job(): # Harvest all tiles in the region (they should all be ready). i = 0 while i < len(region_positions): idx = region_positions[i] x, y = deserialize_2d_index(idx, ws) goto((x, y)) if get_entity_type() == Entities.Pumpkin and can_harvest(): harvest() i += 1 return True return job def _factory_replant_region(args_tuple): # args_tuple: (positions_list,) region_positions = args_tuple[0] def job(): # Re-till if needed and replant region. i = 0 while i < len(region_positions): idx = region_positions[i] x, y = deserialize_2d_index(idx, ws) goto((x, y)) if get_ground_type() != Grounds.Soil: till() if not can_pay_for(Entities.Pumpkin): return False plant(Entities.Pumpkin) i += 1 return True return job def main(): recompute_ws() # Force short mode (main participates; avoids long-mode watcher caveats). recompute_parallelism("short") workers = max_drones() if workers < 1: workers = 1 regions = build_row_regions(ws, workers) # Build args as tuple-of-one per region args = [] i = 0 while i < len(regions): if len(regions[i]) > 0: args.append((regions[i],)) i += 1 # Initial prepare + plant across all regions results = drone_task(_factory_prepare_and_plant_region, args) if results == None: return i = 0 while i < len(results): if not results[i]: return i += 1 # Global synchronized cycles while True: # Wait until all regions report ready all_ready = False while not all_ready: checks = drone_task(_factory_check_region_ready, args) if checks == None: return all_ready = True i = 0 while i < len(checks): if not checks[i]: all_ready = False i += 1 # Harvest everywhere in parallel hres = drone_task(_factory_harvest_region, args) if hres == None: return i = 0 while i < len(hres): if not hres[i]: return i += 1 # Replant everywhere in parallel rres = drone_task(_factory_replant_region, args) if rres == None: return i = 0 while i < len(rres): if not rres[i]: return i += 1 if __name__ == "__main__": clear() main() Old implementation idea for parallelization Just noting this down. My old idea is to split the drones naively. Each drone is responsible for a "patch" (If splitting across a dimension: would be a row if n_drones == ws, and if going for a locally close patch, cut the board into rectangles). Then following the typical serial-version pumpkin growing algorithm. Workers remove themselves when their patch is fully grown. Main drone harvests and starts loop again. Pumpkin (Old) queue_pumpkin Uses queue datastructure in a smarter way compared to naive approach. Because memory in this game is practically free compared to time / tickspeed. TODO implement speedup based on straggling pumpkins. # Version that uses a head index into a list. from utils_list import * from utils_move import * from utils_item import * # TODO speedup_field_percentage = 0.2 speedup_strategy_list = ["none", "water", "fertilizer"] speedup_strategy = "none" clear() ws = get_world_size() def till_all(): goto((0, 0)) for i in range(ws): for j in range(ws): if get_ground_type() != Grounds.Soil: till() move(East) move(North) return True def replant_pumpkins(): if not can_pay_for(Entities.Pumpkin, ws * ws): print("Cannot pay for all pumkpins") return False for i in range(ws): for j in range(ws): plant(Entities.Pumpkin) move(East) move(North) return True # queue as (list, head) arr_is_grown = list(range(ws * ws)) head = 0 def q_not_empty(): return head < len(arr_is_grown) def q_pop(): # O(1) amortized pop-front global head x = arr_is_grown[head] head += 1 return x def q_push(x): arr_is_grown.append(x) def q_compact_if_needed(): # occasional compact to free consumed prefix global arr_is_grown global head if head > 1024 and head > len(arr_is_grown) // 2: arr_is_grown = arr_is_grown[head:] head = 0 till_all() replant_pumpkins() while True: if not q_not_empty(): harvest() goto((0, 0)) arr_is_grown = list(range(ws * ws)) head = 0 if not replant_pumpkins(): break while q_not_empty(): curr_pos_to_check = q_pop() goto(deserialize_2d_index(curr_pos_to_check, ws)) if not can_harvest(): if get_entity_type() == Entities.Dead_Pumpkin: if not can_pay_for(Entities.Pumpkin): break plant(Entities.Pumpkin) q_push(curr_pos_to_check) q_compact_if_needed() Naive My old code. Use the above instead. from utils_list import * from utils_move import * from utils_item import * speedup_field_percentage = 0.2 speedup_strategy_list = ["none", "water", "fertilizer"] speedup_strategy = "none" clear() ws = get_world_size() # Serialized list of positions on the field # Represents ungrown pumpkins still needed to revisit arr_is_grown = list(range(ws * ws)) def replant_pumpkins(): if not can_pay_for(Entities.Pumpkin, ws * ws): return False for i in range(ws): for j in range(ws): if get_ground_type() != Grounds.Soil: till() plant(Entities.Pumpkin) move(East) move(North) return True replant_pumpkins() while True: if not arr_is_grown: harvest() goto((0, 0)) arr_is_grown = list(range(ws * ws)) if not replant_pumpkins(): break while arr_is_grown: curr_pos_to_check = arr_is_grown.pop(0)# Killer O(n). goto(deserialize_2d_index(curr_pos_to_check, ws)) if not can_harvest(): if get_entity_type() == Entities.Dead_Pumpkin: if not check_has_item(Items.Carrot, 1): break if get_ground_type() != Grounds.Soil: till() plant(Entities.Pumpkin) arr_is_grown.append(curr_pos_to_check) Cactus (Parallelization, doesn't use Factory) Given the constraints of using a drone, we can only: Move one tile at a time Swap two adjacent cacti underneath the drone (in a cross pattern) it seems like shakersort was built for these constraints. Compared to gnomesort (manual insertion sort), this seems more efficient because we can still perform sorting (swaps) on the way back while repositioning the drone. Uses rowwise then colwise shakersort. This should have nicer big O constants on the order of n(n-1)/2 compared to naive n**2, however rarely there will be a speedup past that, because axis-sorting requires all sorting jobs to be finished before proceeding to the next axis. This per-axis sort strategy makes this sorting algorithm accessible for parallelization later on. Math 😱😱 If you haven't noticed already, sorting rowwise then sorting columns won't un-sort the rows. "Stability" of row-sortedness after column-sorting comes from the Monotonicity Principle (which comes from the Pigeonhole principle). If the initial row sort ensures that every element in column j is less than or equal to its row-neighbor in column j+1 (where $A_{i,j​} leq A_{i,j+1}​$), the relative order between the two columns is preserved regardless of how the individual columns are sorted. The k-th smallest value in column j must be less than or equal to the k-th smallest value in column j+1. In English: When you sort every row (left to right), you establish a rule that is true for every single row: nothing in column j is greater than its neighbor in column j+1. Sorting the columns up and down simply moves the largest numbers in each column to the top rows and the smallest numbers to the bottom rows. Because the rule established in step one is true for every pair of elements across the columns, the vertical sorting cannot break the horizontal order. In addition, after sorting row-wise and starting the col-wise sort, the first and last columns seem to always finish faster. I think this comes from boundary saturation. Suppose H is the maximum height of a cactus (which is 15). After you finish the row-wise sort, in each row, the first column holds that row’s minimum, and the last column holds that row’s maximum. So when you then sort by columns, the first column is a list of row-minima, and the last column is a list of row-maxima. If cactus height selection is independent and identically distributed (i.i.d.): The distribution of the row minimum concentrates near 0. the distribution of the row maximum concentrates near H. As the row length m grows, the chance that a row actually contains the absolute min (0) or the absolute max (H) grows larger, so many rows will contain at least one min (0) and/or max (15). If it doesn't contain one of these, then the chances are even higher to contain one value off of the min or max (1 or 14). Statistically, using the distribution where one row-max H exists in the row of length m: $Pr(row max) = 1 - left( frac{H}{H + 1} right) ^{m}$Pretend there's latex rendering for that code block. What that basically says is that for example values of m: m=4: 22.75% m=8: 40.33% m=16: 64.40% m=22: 75.82% m=32: 87.32%So at max worldsize, each row will have an 87% chance to contain an element of value (max = 15). Same case for the minima. Thus, a large fraction of rows will have that. After a row-wise sort, this creates lots of ties at the edges, which means fewer inversions and fewer swaps for shakersort on those edge columns, because equal-valued elements don't need swapping. Any sorting algorithm should clear these such columns much faster.Just note that this only applies after we've sorted along one dimension already. We can just barely make use of this property for parallelization. If our parallelization strategy makes use of the main drone as a worker itself (I call this strategy "short"), instead of using it as a task delegator, having the main drone work on the first or last column is useful because it will free up sooner and can deal with any logical administrative tasks immediately afterwards while slower drones take care of the middle of the field. I'm guessing it's probably only statistically significant above ws=22, and when it does apply, it'll probably be an extremely small speedup. Additional Notes I probably won't update this function to use the new drone_task type syntax. This is already clean enough and works. Improvement There is an opening for improvement where finished drones can help any remaining drones sort their rows or columns. Since sorting a row is stateless w.r.t. cactus value, and since a swap is (I suspect) an atomic operation w.r.t. timing and the tick system, additional drones can work an individual shakersort on the same row w.r.t. variable environment. Drones update their own left and right limits on their own. Requirements: Make sure "long" drone mode works by picking out the first finished job and not the first started job. Limit additional helper drones to 1. 100% boost across rows is more valuable than a 400% boost on one row. Start a shake_row task from the field-middle outwards. Code # Strategy: double shakersort # Uses parallelization via utils_drone.drone_AA_task from utils_list import * from utils_item import * from utils_farm import * from utils_move import * from utils_drone import * def shake_row(): # Shakersort the current row; uses goto_x to anchor & restore. prev_x = get_pos_x() goto_x(0) left = 0 right = ws - 1 dir = East while left < right: if dir == East: cx = get_pos_x() loc_last_swap = left while cx < right: curr = measure() nxt = measure(dir) if curr > nxt: swap(dir) loc_last_swap = cx move(dir) cx += 1 right = loc_last_swap dir = u_turn(dir) if dir == West: cx = get_pos_x() loc_last_swap = right while cx > left: curr = measure() nxt = measure(dir) if curr < nxt: swap(dir) loc_last_swap = cx move(dir) cx -= 1 left = loc_last_swap dir = u_turn(dir) goto_x(prev_x) return True def shake_col(): # Shakersort the current column; uses goto_y to anchor & restore. prev_y = get_pos_y() goto_y(0) left = 0 right = ws - 1 dir = North while left < right: if dir == North: cy = get_pos_y() loc_last_swap = left while cy < right: curr = measure() nxt = measure(dir) if curr > nxt: swap(dir) loc_last_swap = cy move(dir) cy += 1 right = loc_last_swap dir = u_turn(dir) if dir == South: cy = get_pos_y() loc_last_swap = right while cy > left: curr = measure() nxt = measure(dir) if curr < nxt: swap(dir) loc_last_swap = cy move(dir) cy -= 1 left = loc_last_swap dir = u_turn(dir) goto_y(prev_y) return True def _subtask_plant_eastwards(): # Plant one full row, moving east ws steps (wraps back to start col) for _ in range(ws): plant(Entities.Cactus) move(East) return True def plant_cacti(): # """Plant cactus on every tile; parallelism handled inside drone_AA_task.""" recompute_parallelism() if not can_pay_for(Entities.Cactus, ws * ws): print("Cannot pay for Cactus, ws*ws") return False return drone_naive_AA_task(_subtask_plant_eastwards, North) def maintask(): #"""Plant, row shakersort sweep, column shakersort sweep, harvest.""" recompute_parallelism() if not plant_cacti(): return False # Shaker rows (ws passes north) if not drone_naive_AA_task(shake_row, North): return False goto((0, 0)) # Shaker cols (ws passes east) if not drone_naive_AA_task(shake_col, East): return False goto((0, 0)) harvest() return True if __name__ == "__main__": clear() till_all() while maintask(): continue Cactus (TODO Parallelization Test Improvement) Test script Here is code to empirically test my hypothesis that multiple shakersort task instances on one row will not interfere with each other due to three properties: How the non-greedy limit-updating functionality operates inside shakersort Shakersort is stateless w.r.t. the cactus value the drone is currently "holding" Cactus-swaps are atomic operations. However I'm still investigating if there are any potential race conditions. from utils_list import * from utils_drone import * from utils_grow import * from utils_farm import * from utils_move import * from utils_item import * if __name__ == "__main__": clear() for i in range(ws): till() plant(Entities.Cactus) move(East) def _factory(args_tuple): def shake_row(): # Shakersort the current row; uses goto_x to anchor & restore. prev_x = get_pos_x() goto_x(0) left = 0 right = ws - 1 dir = East while left < right: if dir == East: cx = get_pos_x() loc_last_swap = left while cx < right: curr = measure() nxt = measure(dir) if curr > nxt: swap(dir) loc_last_swap = cx move(dir) cx += 1 right = loc_last_swap dir = u_turn(dir) if dir == West: cx = get_pos_x() loc_last_swap = right while cx > left: curr = measure() nxt = measure(dir) if curr < nxt: swap(dir) loc_last_swap = cx move(dir) cx -= 1 left = loc_last_swap dir = u_turn(dir) goto_x(prev_x) return True return shake_row args = [] # If swaps are truly atomic there # will be no use for delaying. # base_delay = 1000 base_delay = 0 for i in range(5): args.append((i*base_delay, )) drone_task(_factory, ) Cactus (Old) Old Code for Shakersort (pre-parallelization era) # Strategy: double shakersort from utils_list import * from utils_item import * from utils_move import * ws = get_world_size() def till_all(): for i in range(ws): for j in range(ws): if get_ground_type() != Grounds.Soil: till() move(East) move(North) def plant_cacti(): if not can_pay_for(Entities.Cactus, ws*ws): print("Cannot pay for Cactus, ws*ws") return False for i in range(ws): for j in range(ws): plant(Entities.Cactus) move(East) move(North) return True def shake_row(): # Shakersort the row. goto_x(0) left = 0 right = ws - 1 dir = East while left < right: if dir == East: cx = get_pos_x() loc_last_swap = left while cx < right: curr = measure() next = measure(dir) if curr > next: swap(dir) loc_last_swap = cx move(dir) cx += 1 right = loc_last_swap dir = u_turn(dir) if dir == West: cx = get_pos_x() loc_last_swap = right while cx > left: curr = measure() next = measure(dir) if curr < next: swap(dir) loc_last_swap = cx move(dir) cx -= 1 left = loc_last_swap dir = u_turn(dir) def shake_col(): # Shakersort the column. goto_y(0) left = 0 right = ws - 1 dir = North while left < right: if dir == North: cy = get_pos_y() loc_last_swap = left while cy < right: curr = measure() next = measure(dir) if curr > next: swap(dir) loc_last_swap = cy move(dir) cy += 1 right = loc_last_swap dir = u_turn(dir) if dir == South: cy = get_pos_y() loc_last_swap = right while cy > left: curr = measure() next = measure(dir) if curr < next: swap(dir) loc_last_swap = cy move(dir) cy -= 1 left = loc_last_swap dir = u_turn(dir) if __name__ == "__main__": clear() till_all() while True: if not plant_cacti(): break for i in range(ws): shake_row() move(North) goto((0, 0)) for j in range(ws): shake_col() move(East) goto((0, 0)) harvest() Naive Make two n**2 bubble passes, one rowwise and one colwise. Sorry BylliGoat for the passing shade but watching the drone do massive passes on the 22x22 field while there were only two tiny pockets of unsorted cacti remaining made me kinda mildly annoyed. I guess it's my fault for choosing to use a n**2 solution in the first place instead of making my own. I got too lazy and just chose the first cactus steam guide I saw and didnt notice there was one right beside it that implemented gnomesort instead. Sunflower (Parallelization, uses Factory) The following two parallelized versions of my solution to the sunflower problem use an algorithm that manually finds the max sunflowers. Additional implementation idea I think that unironically, a naive parallelization approach that persists drones between bucket passes will be faster. Here is the overview: These strategies below work better with "short" parallelization strategy because each subtask is uniform in length and there is little usage respawning drones while the overall job is underway. If ws = max_drones(): - Spawn drones on a column and wait for them to line up and wait for all flowers to grow. - Make 8 passes (for each petal value) using each drone for its row, at the same time. - As soon as one pass is done you can immediately proceed to the next petal value and make a pass. My algorithm needs to respawn drones, which is inefficient, but it looks funny. if ws != max_drones(): - Plant the field. No need to keep track. - Spawn drones and arrange their starting_positions as 1d board indices spaced according to pass_length = ws // max_drones() + 1. - For each drone, traverse the field indices (1d) by pass_length times using goto(deserialize_2d_index(field_idx += 1)). Then harvest any sunflowers of level 15. - Repeat passes for decreasing petal values. I leave this as an exercise to the reader because I'm too lazy Notes After figuring out a suitable drone_task function that can take **args in a convoluted way, I revised my first parallelization attempt into the following code. Noting the factory version subtasks, they are not mathematically closed w.r.t. starting position because the location in which drones are spawned does not matter: The overall task time will be the same no matter the spawning location. This is because the game board is a torus / infinite tiling, and there will always exist a "worst distance" of worldsize taxicab dist away from your current position. I return to (0, 0) after the overall task anyways. The subtask, defined as "going to and harvesting at positions defined in a queue" orchestrated per petal value, does not benefit from being closed due to the above point; no matter the spawning point of drones for the next petal value, there will always exist a worst distance. In fact they benifit because subtask drones end their process faster due to not requiring a return to original position. p_bucket_sf_factory # Version that uses the drone_task function I wrote in utils_drone. from utils_list import * from utils_item import * from utils_move import * from utils_drone import * from utils_farm import * BUCKET_MIN = 7 BUCKET_MAX = 15 NUM_BUCKETS = BUCKET_MAX - BUCKET_MIN + 1 def b_idx(v): return v - BUCKET_MIN def _factory_plant_row(args_tuple): # args_tuple: (row_index,) row_index = args_tuple[0] def job(): start = get_pos() goto((0, row_index)) record = [] for _ in range(ws): plant(Entities.Sunflower) record.append(measure()) move(East) goto(start) return record return job def plant_sf(): recompute_parallelism() args = [] i = 0 while i < ws: args.append((i,)) i += 1 results = drone_task(_factory_plant_row, args) if results == None: return False field = [] i = 0 while i < ws: field.append(results[i]) i += 1 return field def _factory_harvest_worker(args_tuple): # args_tuple: (positions_list,) positions = args_tuple[0] def worker(): q_to_visit = positions head = 0 while head < len(q_to_visit): next = q_to_visit[head] head += 1 goto(deserialize_2d_index(next, ws)) if get_entity_type() == Entities.Sunflower: if can_harvest(): harvest() else: q_to_visit.append(next) return True return worker def harvest_sf(buckets): recompute_parallelism() # SERIAL (no parallelism) if parallelism_type == "none": v = BUCKET_MAX while v >= BUCKET_MIN: k = b_idx(v) cbucket = buckets[k] if len(cbucket) > 0: _factory_harvest_worker((cbucket))() v -= 1 return True # PARALLEL (short/long) v = BUCKET_MAX while v >= BUCKET_MIN: k = b_idx(v) cbucket = buckets[k] if len(cbucket) > 0: workers = max_drones() if workers < 1: workers = 1 if parallelism_type == "long" and workers > 1: workers -= 1 # Use unsafe matrix split for spatial locality # TODO: I need a better heuristic for # locality split. parts = deserialize_2d_matrix_unsafe_CC(cbucket, workers) args = [] i = 0 while i < len(parts): if len(parts[i]) > 0: args.append((parts[i],)) i += 1 if len(args) > 0: results = drone_task(_factory_harvest_worker, args) if results == None: return False i = 0 while i < len(results): if not results[i]: return False i += 1 v -= 1 return True def calculate_buckets_from_field_2d(field_2d): buckets = [] for _ in range(NUM_BUCKETS): buckets.append([]) field = serialize_2d_matrix(field_2d) for i in range(len(field)): buckets[b_idx(field[i])].append(i) return buckets if __name__ == "__main__": clear() till_all() while True: goto((0, 0)) field_2d = plant_sf() if not field_2d: break buckets = calculate_buckets_from_field_2d(field_2d) if not harvest_sf(buckets): break Sunflower (Parallelization, doesn't use Factory) You should note that funnily enough, I wrote this version before even considering the naive version of parallelization, which would be to spawn drones along a column, and have them traverse each row and harvest the maximum (15), and then repeat that for each sunflower petal value in decreasing order. ... Am I actually the naive one here? In addition, I also wrote this before I figured out my parallelization factory function strategy to take care of inputs into subtasks. p_bucket_sf from utils_list import * from utils_item import * from utils_move import * from utils_drone import * # TODO figure out a way to persist drones through buckets while # letting them wait for other drones to finish their portion of bucket. # The overhead is quite minimal, but we can still save time there. BUCKET_MIN = 7 BUCKET_MAX = 15 NUM_BUCKETS = BUCKET_MAX - BUCKET_MIN + 1 def b_idx(v): return v - BUCKET_MIN def _subtask_till_eastwards(): # Till one full row, moving east ws steps (wraps back to start col) for _ in range(ws): if get_ground_type() != Grounds.Soil: till() move(East) return True def till_all(): # """Till every row once; parallelism handled inside drone_AA_task.""" recompute_parallelism() return drone_naive_AA_task(_subtask_till_eastwards, North) def _subtask_plant_sf_eastwards(): record = [] for _ in range(ws): plant(Entities.Sunflower) record.append(measure()) move(East) return record def plant_sf(): # TODO: Take care of non-bool outputs. Here is custom version # for the simple usecase of sunflowers. recompute_parallelism() dir = North length = ws field = []# 2d matrix for _ in range(length): field.append([]) _drones = []# collection of <drone> objects _drones_idx = [] # index w.r.t. field if parallelism_type == "none": # Does not account for drone parallelism. for i in range(length): res = _subtask_plant_sf_eastwards() if not res: return False field[i] = res move(dir) return field elif parallelism_type == "short": # Uses the main drone as a worker. for i in range(length): subtask_result_watcher = spawn_drone(_subtask_plant_sf_eastwards) if subtask_result_watcher: # Lists don't support inserting at uninitialized indices. # Fix with dict. _drones.append(subtask_result_watcher) _drones_idx.append(i) else: res = _subtask_plant_sf_eastwards() if not res: return False field[i] = res# Directly insert into field. there is no # <drone> to wait for. move(dir) while _drones_idx: idx = _drones_idx.pop() res = _drones.pop() res = wait_for(res) if not res: return False field[idx] = res return field elif parallelism_type == "long": counter, limit = 0, length while counter < limit: if len(_drones) < max_drones(): subtask_result_watcher = spawn_drone(_subtask_plant_sf_eastwards) if subtask_result_watcher: _drones.insert(0, subtask_result_watcher) _drones_idx.insert(0, counter) counter += 1 move(dir) else: # n_drones under capacity but spawn_drone() failed. quick_print("Error: unexpected spawn_drone() failure") return False if len(_drones) >= max_drones(): idx = _drones_idx.pop()# TODO detect the first # exited drone instead of first-job-started drone res = _drones.pop() res = wait_for(res) if not res: return False field[idx] = res while _drones: idx = _drones_idx.pop() res = _drones.pop() res = wait_for(res) if not res: return False field[idx] = res return field else: print("Error: Unknown parallelism type:", parallelism_type) return False def _factory_harvest_worker(positions): # This function constructs zero-arg functions for use in spawn_drone. # For more information, look up the Factory coding design pattern. # Args: # positions: list of serialized 2d indices. # Returns: # function: zero-arg ready-to-deploy worker def worker(): q_to_visit = positions head = 0 while head < len(q_to_visit): next = positions[head] head += 1 goto(deserialize_2d_index(next, ws)) if get_entity_type() == Entities.Sunflower: if can_harvest(): harvest() else: q_to_visit.append(next)# requeue: still growing return True return worker def harvest_sf(buckets): # Consider variable buckets destroyed/unusable after use. recompute_parallelism() _drones = [] if parallelism_type == "none": v = BUCKET_MAX while v >= BUCKET_MIN: k = b_idx(v) cbucket = buckets[k] if len(cbucket) > 0: _factory_harvest_worker(cbucket)() v -= 1 return True elif parallelism_type == "short": cbucket_counter = BUCKET_MAX while cbucket_counter >= BUCKET_MIN: if len(buckets[b_idx(cbucket_counter)]): # Partitions should contain locally close # positions according to deserialization logic. partition = deserialize_2d_matrix_unsafe_CC(buckets[b_idx(cbucket_counter)], max_drones()) for i in range(len(partition)): positions = partition[i] subtask_watcher = spawn_drone(_factory_harvest_worker(positions)) if subtask_watcher: _drones.append(subtask_watcher) else: if not _factory_harvest_worker(positions)(): return False while _drones: if not wait_for(_drones.pop()): return False else: # Nothing in this bucket. pass cbucket_counter -= 1 return True elif parallelism_type == "long": cbucket_counter = BUCKET_MAX while cbucket_counter >= BUCKET_MIN: if len(buckets[b_idx(cbucket_counter)]): # Partitions should contain locally close # positions according to deserialization logic. partition = deserialize_2d_matrix_unsafe_CC(buckets[b_idx(cbucket_counter)], max_drones() - 1) counter, limit = 0, len(partition) while counter < limit: positions = partition[counter] if len(_drones) < max_drones(): subtask_watcher = spawn_drone(_factory_harvest_worker(positions)) if subtask_watcher: _drones.insert(0, subtask_watcher) counter += 1 else: # n_drones under capacity but spawn_drone() failed. quick_print("Error: unexpected spawn_drone() failure") return False if len(_drones) >= max_drones(): if not wait_for(_drones.pop()): return False while _drones: if not wait_for(_drones.pop()): return False else: # Nothing in this bucket. pass cbucket_counter -= 1 return True else: print("Error: Unknown parallelism type:", parallelism_type) return False def calculate_buckets_from_field_2d(field_2d): buckets = [] for _ in range(NUM_BUCKETS): buckets.append([]) field = serialize_2d_matrix(field_2d) for i in range(len(field)): buckets[b_idx(field[i])].append(i) return buckets if __name__ == "__main__": clear() till_all() while True: goto((0, 0)) field_2d = plant_sf() buckets = calculate_buckets_from_field_2d(field_2d) harvest_sf(buckets) Sunflower (Old) Uses a bucket system to keep track of sunflowers of various petal number values. The corner case only applies usually to value 15 sunflowers as they are still growing while the drone switches into harvesting mode. It starts backwards due to .pop() and there will be a little overhead. bucket_sunflowers from utils_list import * from utils_move import * BUCKET_MIN = 7 BUCKET_MAX = 15 NB = BUCKET_MAX - BUCKET_MIN + 1 def bidx(v): return v - BUCKET_MIN clear() ws = get_world_size() # Need ♥♥♥♥♥♥ list comprehension # buckets = [[] for _ in range(NB)] buckets = [] for _ in range(NB): buckets.append([]) remaining = 0 def replant_sf(): global buckets global remaining for k in range(NB): buckets[k] = [] remaining = 0 goto((0, 0)) for i in range(ws): for j in range(ws): if get_ground_type() != Grounds.Soil: till() plant(Entities.Sunflower) pos = serialize_2d_index((j, i), ws) v = measure() buckets[bidx(v)].append(pos) remaining += 1 move(East) move(North) while True: replant_sf() while remaining > 0: progressed = False # Descend from 15 -> 7, drain each bucket this pass. for v in range(BUCKET_MAX, BUCKET_MIN - 1, -1): k = bidx(v) b = buckets[k] while b: pos = b.pop() goto(deserialize_2d_index(pos, ws)) if can_harvest(): harvest() remaining -= 1 progressed = True else: # Corner-case: push deferred item to # front of the bucket # (acceptable O(n) cost, small window). buckets[k].insert(0, pos) naive_sunflowers Bad code that uses an expensive max-finding function. from utils_list import * from utils_move import * clear() ws = get_world_size() arr_sf = list(range(ws * ws)) # random def replant_sf(): global arr_sf goto((0, 0)) for i in range(ws): for j in range(ws): if get_ground_type() != Grounds.Soil: till() plant(Entities.Sunflower) arr_sf[serialize_2d_index((j, i), ws)] = measure() move(East) move(North) replant_sf() while True: for _ in range(ws * ws): val, idx = max_with_index(arr_sf) goto(deserialize_2d_index(idx, ws)) while not can_harvest(): pass harvest() cx = get_pos_x() cy = get_pos_y() arr_sf[serialize_2d_index((cx, cy), ws)] = 0 replant_sf() Maze (Parallelization Cheese) I have implemented the hilarious cheese strategy detailed in the below guide: https://steamcommunity.com/sharedfiles/filedetails/?id=3588071660 This strategy makes use of the fact that we have more drones available that there are possible maze tiles, for certain combos of worldsize and max_drones. We can cheese the maze-solving problem since we are not constrained to using one drone at a time. We can check every tile in the maze immediately using many drones at once, instead of expensively traversing it using a single drone. In light of this new code I've made, I also added a flag to my onecrop polyculture code to enable farming Weird Substance. # Consumes approximately 25k Weird Substance per second. # Might have a bug with replanting a separate maze # while the current 300-maze run is in progress. I suspect # is due to THRESH_SOLVED but since there is barely any # overhead, and because gold scales linearly with maze # regeneration, I will ignore this bug for now. from utils_list import * from utils_drone import * from utils_grow import * from utils_farm import * from utils_move import * from utils_item import * MANUAL_WORLD_SIZE = 5# Don't change this. THRESH_SOLVED = 3# Consider 300 mazes over when applying # this number of weird substance without gold location changing. THRESH_NO_MAZE = 50# Number of times to check for lack of maze to # consider the _subtask_worker job finished. COST_MAZE = MANUAL_WORLD_SIZE * 2**(num_unlocked(Unlocks.Mazes) - 1) def stop_condition(): return num_items(Items.Gold) > 100000000 def _factory_maze(args_tuple): # args_tuple: (start_pos_1d,) # If the start pos is outside of the board, consider it a maze_starter worker. start_pos_1d = args_tuple[0] if start_pos_1d >= MANUAL_WORLD_SIZE**2: def _subtask_maze_starter(): # optionally delay here? while True: if stop_condition(): break if get_entity_type() not in (Entities.Hedge, Entities.Treasure): plant(Entities.Bush) use_item(Items.Weird_Substance, COST_MAZE) return True return _subtask_maze_starter else: start_pos = deserialize_2d_index(start_pos_1d, MANUAL_WORLD_SIZE) def _subtask_worker(): goto(start_pos) while get_entity_type() not in (Entities.Hedge, Entities.Treasure): pass# Wait for maze to be initialized while True: counter = 0 if stop_condition(): break while get_entity_type() == Entities.Treasure: ploc_treasure = get_pos() use_item(Items.Weird_Substance, COST_MAZE) cloc_treasure = measure() if ploc_treasure == cloc_treasure: counter += 1 if counter >= THRESH_SOLVED: harvest() while get_entity_type() not in (Entities.Hedge, Entities.Treasure): # A worker drone has finished and harvested the maze counter += 1 if counter >= THRESH_NO_MAZE: return True return True return _subtask_worker def main(): total_tiles = MANUAL_WORLD_SIZE**2 drone_cap = max_drones_safe() if total_tiles > drone_cap: quick_print("Exception: Cannot cover the board with drones.") return if not check_has_item(Items.Weird_Substance, COST_MAZE * 300): quick_print("Exception: You don't have enough Weird Substance") return if stop_condition(): quick_print("Stop condition already met.") return clear() set_world_size(MANUAL_WORLD_SIZE) args = [] for i in range(MANUAL_WORLD_SIZE**2 + 1): args.append((i,)) drone_task(_factory_maze, args) if __name__ == "__main__": main()# Using this notation because I need to raise exception # JIC, then return None Other Maze Naive Search or Maze Parallelization Search Unfortunately, I don't have a naive mazerunner for you. When I was playing the early game I had max multiplier on maze output becuase I farmer Weird Substance, and I just took a random BFS/DFS script off someone else while I coded by sunflower (parallelization) code. Here is the source I used: https://steamcommunity.com/sharedfiles/filedetails/?id=3408470559 Dino Unfortunately there is no parallelization for farming bones since only one drone can wear the dino hat. You'll need a better dino script that can compute shortcuts between a given optimal snake path, because I used someone else's script that tediously runs the same board-covering pattern for like 7 minutes per run while coding my shakersort parallelization modification. Here is the source I used: https://steamcommunity.com/sharedfiles/filedetails/?id=3585506194 Misc Stack Overflow Quite classic. def recur(): recur() if __name__ == "__main__": recur() Wrong Order Simply invert the sorting condition of the algorithm, from lessthan to greaterthan (and conversely, for the reverse pass of shakersort). Fashion Show Left as an exercise to the reader. Infinite Loop f0 from f1 import * f1 from f0 import * Then run either file. Pet the Piggy Come on who hasn't run this command the moment they saw it Outro Please share your code in a steam guide or discussion as well. I'm lazy and want drop-in code for maze-finding. Also automation and meta-optimization.